You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/12/17 11:52:30 UTC
[rocketmq] branch enhancedTls updated: Reload ssl context when
trust cert changed
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch enhancedTls
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/enhancedTls by this push:
new a25d38c Reload ssl context when trust cert changed
a25d38c is described below
commit a25d38c4b207a5dfcf4e57889d588e2d2f2375dd
Author: yukon <yu...@apache.org>
AuthorDate: Sun Dec 17 19:17:38 2017 +0800
Reload ssl context when trust cert changed
---
.../apache/rocketmq/broker/BrokerController.java | 25 +++++++-
.../apache/rocketmq/namesrv/NamesrvController.java | 25 +++++++-
.../apache/rocketmq/srvutil/FileWatchService.java | 48 ++++++++--------
.../rocketmq/srvutil/FileWatchServiceTest.java | 66 +++++++++++++---------
4 files changed, 109 insertions(+), 55 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9823f76..9a3a56a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -396,10 +396,31 @@ public class BrokerController {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
- new String[] {TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath},
+ new String[] {
+ TlsSystemConfig.tlsServerCertPath,
+ TlsSystemConfig.tlsServerKeyPath,
+ TlsSystemConfig.tlsServerTrustCertPath
+ },
new FileWatchService.Listener() {
+ boolean certChanged, keyChanged = false;
@Override
- public void onChanged() {
+ public void onChanged(String path) {
+ if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
+ reloadServerSslContext();
+ }
+ if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
+ certChanged = true;
+ }
+ if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
+ keyChanged = true;
+ }
+ if (certChanged && keyChanged) {
+ certChanged = false;
+ keyChanged = false;
+ reloadServerSslContext();
+ }
+ }
+ private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
((NettyRemotingServer) fastRemotingServer).loadSslContext();
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 6ba8b38..844175c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -103,10 +103,31 @@ public class NamesrvController {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
- new String[] {TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath},
+ new String[] {
+ TlsSystemConfig.tlsServerCertPath,
+ TlsSystemConfig.tlsServerKeyPath,
+ TlsSystemConfig.tlsServerTrustCertPath
+ },
new FileWatchService.Listener() {
+ boolean certChanged, keyChanged = false;
@Override
- public void onChanged() {
+ public void onChanged(String path) {
+ if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
+ reloadServerSslContext();
+ }
+ if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
+ certChanged = true;
+ }
+ if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
+ keyChanged = true;
+ }
+ if (certChanged && keyChanged) {
+ certChanged = false;
+ keyChanged = false;
+ reloadServerSslContext();
+ }
+ }
+ private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
index 099b027..bc68d6a 100644
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
+++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
@@ -17,12 +17,16 @@
package org.apache.rocketmq.srvutil;
+import com.google.common.base.Strings;
+import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -32,23 +36,23 @@ import org.slf4j.LoggerFactory;
public class FileWatchService extends ServiceThread {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
- private final String [] watchFiles;
- private final boolean [] isFileChangedFlag;
- private final String [] fileCurrentHash;
+ private final List<String> watchFiles;
+ private final List<String> fileCurrentHash;
private final Listener listener;
private static final int WATCH_INTERVAL = 500;
private MessageDigest md = MessageDigest.getInstance("MD5");
- public FileWatchService(final String [] watchFiles,
+ public FileWatchService(final String[] watchFiles,
final Listener listener) throws Exception {
- this.watchFiles = watchFiles;
this.listener = listener;
- this.isFileChangedFlag = new boolean[watchFiles.length];
- this.fileCurrentHash = new String[watchFiles.length];
+ this.watchFiles = new ArrayList<>();
+ this.fileCurrentHash = new ArrayList<>();
for (int i = 0; i < watchFiles.length; i++) {
- isFileChangedFlag[i] = false;
- fileCurrentHash[i] = hash(watchFiles[i]);
+ if (!Strings.isNullOrEmpty(watchFiles[i]) && new File(watchFiles[i]).exists()) {
+ this.watchFiles.add(watchFiles[i]);
+ this.fileCurrentHash.add(hash(watchFiles[i]));
+ }
}
}
@@ -65,20 +69,17 @@ public class FileWatchService extends ServiceThread {
try {
this.waitForRunning(WATCH_INTERVAL);
- boolean allFileChanged = true;
- for (int i = 0; i < watchFiles.length; i++) {
- String newHash = hash(watchFiles[i]);
- if (!newHash.equals(fileCurrentHash[i])) {
- isFileChangedFlag[i] = true;
- fileCurrentHash[i] = newHash;
+ for (int i = 0; i < watchFiles.size(); i++) {
+ String newHash;
+ try {
+ newHash = hash(watchFiles.get(i));
+ } catch (Exception ignored) {
+ log.warn(this.getServiceName() + " service has exception when calculate the file hash. ", ignored);
+ continue;
}
- allFileChanged = allFileChanged && isFileChangedFlag[i];
- }
-
- if (allFileChanged) {
- listener.onChanged();
- for (int i = 0; i < isFileChangedFlag.length; i++) {
- isFileChangedFlag[i] = false;
+ if (!newHash.equals(fileCurrentHash.get(i))) {
+ fileCurrentHash.set(i, newHash);
+ listener.onChanged(watchFiles.get(i));
}
}
} catch (Exception e) {
@@ -98,7 +99,8 @@ public class FileWatchService extends ServiceThread {
public interface Listener {
/**
* Will be called when the target files are changed
+ * @param path the changed file path
*/
- void onChanged();
+ void onChanged(String path);
}
}
diff --git a/srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java b/srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
index 6b411db..791abcf 100644
--- a/srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
+++ b/srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.rocketmq.srvutil;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
-import java.nio.file.NoSuchFileException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.junit.Rule;
@@ -30,7 +29,6 @@ import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Java6Assertions.failBecauseExceptionWasNotThrown;
@RunWith(MockitoJUnitRunner.class)
public class FileWatchServiceTest {
@@ -39,11 +37,12 @@ public class FileWatchServiceTest {
@Test
public void watchSingleFile() throws Exception {
- File file = tempFolder.newFile();
+ final File file = tempFolder.newFile();
final Semaphore waitSemaphore = new Semaphore(0);
FileWatchService fileWatchService = new FileWatchService(new String[] {file.getAbsolutePath()}, new FileWatchService.Listener() {
@Override
- public void onChanged() {
+ public void onChanged(String path) {
+ assertThat(file.getAbsolutePath()).isEqualTo(path);
waitSemaphore.release();
}
});
@@ -54,29 +53,13 @@ public class FileWatchServiceTest {
}
@Test
- public void watchSingleFile_NotExits() throws Exception {
- File file = tempFolder.newFile();
- final Semaphore waitSemaphore = new Semaphore(0);
- try {
- FileWatchService fileWatchService = new FileWatchService(new String[] {file.getAbsolutePath() + 123}, new FileWatchService.Listener() {
- @Override
- public void onChanged() {
- waitSemaphore.release();
- }
- });
- failBecauseExceptionWasNotThrown(NoSuchFileException.class);
- } catch (Exception e) {
- assertThat(e).isInstanceOf(NoSuchFileException.class);
- }
- }
-
- @Test
public void watchSingleFile_FileDeleted() throws Exception {
File file = tempFolder.newFile();
final Semaphore waitSemaphore = new Semaphore(0);
- FileWatchService fileWatchService = new FileWatchService(new String[] {file.getAbsolutePath()}, new FileWatchService.Listener() {
+ FileWatchService fileWatchService = new FileWatchService(new String[] {file.getAbsolutePath()},
+ new FileWatchService.Listener() {
@Override
- public void onChanged() {
+ public void onChanged(String path) {
waitSemaphore.release();
}
});
@@ -91,22 +74,49 @@ public class FileWatchServiceTest {
}
@Test
- public void watchTwoFiles_ModifyOne() throws Exception {
+ public void watchTwoFile_FileDeleted() throws Exception {
File fileA = tempFolder.newFile();
File fileB = tempFolder.newFile();
final Semaphore waitSemaphore = new Semaphore(0);
FileWatchService fileWatchService = new FileWatchService(
new String[] {fileA.getAbsolutePath(), fileB.getAbsolutePath()},
new FileWatchService.Listener() {
+ @Override
+ public void onChanged(String path) {
+ waitSemaphore.release();
+ }
+ });
+ fileWatchService.start();
+ fileA.delete();
+ boolean result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
+ assertThat(result).isFalse();
+ modifyFile(fileB);
+ result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
+ assertThat(result).isTrue();
+ fileA.createNewFile();
+ modifyFile(fileA);
+ result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
+ assertThat(result).isTrue();
+ }
+
+ @Test
+ public void watchTwoFiles_ModifyOne() throws Exception {
+ final File fileA = tempFolder.newFile();
+ File fileB = tempFolder.newFile();
+ final Semaphore waitSemaphore = new Semaphore(0);
+ FileWatchService fileWatchService = new FileWatchService(
+ new String[] {fileA.getAbsolutePath(), fileB.getAbsolutePath()},
+ new FileWatchService.Listener() {
@Override
- public void onChanged() {
+ public void onChanged(String path) {
+ assertThat(path).isEqualTo(fileA.getAbsolutePath());
waitSemaphore.release();
}
});
fileWatchService.start();
modifyFile(fileA);
boolean result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
- assertThat(result).isFalse();
+ assertThat(result).isTrue();
}
@Test
@@ -118,14 +128,14 @@ public class FileWatchServiceTest {
new String[] {fileA.getAbsolutePath(), fileB.getAbsolutePath()},
new FileWatchService.Listener() {
@Override
- public void onChanged() {
+ public void onChanged(String path) {
waitSemaphore.release();
}
});
fileWatchService.start();
modifyFile(fileA);
modifyFile(fileB);
- boolean result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
+ boolean result = waitSemaphore.tryAcquire(2, 1000, TimeUnit.MILLISECONDS);
assertThat(result).isTrue();
}
--
To stop receiving notification emails like this one, please contact
['"commits@rocketmq.apache.org" <co...@rocketmq.apache.org>'].