You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/12/17 11:52:30 UTC

[rocketmq] branch enhancedTls updated: Reload ssl context when trust cert changed

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch enhancedTls
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/enhancedTls by this push:
     new a25d38c  Reload ssl context when trust cert changed
a25d38c is described below

commit a25d38c4b207a5dfcf4e57889d588e2d2f2375dd
Author: yukon <yu...@apache.org>
AuthorDate: Sun Dec 17 19:17:38 2017 +0800

    Reload ssl context when trust cert changed
---
 .../apache/rocketmq/broker/BrokerController.java   | 25 +++++++-
 .../apache/rocketmq/namesrv/NamesrvController.java | 25 +++++++-
 .../apache/rocketmq/srvutil/FileWatchService.java  | 48 ++++++++--------
 .../rocketmq/srvutil/FileWatchServiceTest.java     | 66 +++++++++++++---------
 4 files changed, 109 insertions(+), 55 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9823f76..9a3a56a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -396,10 +396,31 @@ public class BrokerController {
                 // Register a listener to reload SslContext
                 try {
                     fileWatchService = new FileWatchService(
-                        new String[] {TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath},
+                        new String[] {
+                            TlsSystemConfig.tlsServerCertPath,
+                            TlsSystemConfig.tlsServerKeyPath,
+                            TlsSystemConfig.tlsServerTrustCertPath
+                        },
                         new FileWatchService.Listener() {
+                            boolean certChanged, keyChanged = false;
                             @Override
-                            public void onChanged() {
+                            public void onChanged(String path) {
+                                if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
+                                    reloadServerSslContext();
+                                }
+                                if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
+                                    certChanged = true;
+                                }
+                                if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
+                                    keyChanged = true;
+                                }
+                                if (certChanged && keyChanged) {
+                                    certChanged = false;
+                                    keyChanged = false;
+                                    reloadServerSslContext();
+                                }
+                            }
+                            private void reloadServerSslContext() {
                                 ((NettyRemotingServer) remotingServer).loadSslContext();
                                 ((NettyRemotingServer) fastRemotingServer).loadSslContext();
                             }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 6ba8b38..844175c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -103,10 +103,31 @@ public class NamesrvController {
             // Register a listener to reload SslContext
             try {
                 fileWatchService = new FileWatchService(
-                    new String[] {TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath},
+                    new String[] {
+                        TlsSystemConfig.tlsServerCertPath,
+                        TlsSystemConfig.tlsServerKeyPath,
+                        TlsSystemConfig.tlsServerTrustCertPath
+                    },
                     new FileWatchService.Listener() {
+                        boolean certChanged, keyChanged = false;
                         @Override
-                        public void onChanged() {
+                        public void onChanged(String path) {
+                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
+                                reloadServerSslContext();
+                            }
+                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
+                                certChanged = true;
+                            }
+                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
+                                keyChanged = true;
+                            }
+                            if (certChanged && keyChanged) {
+                                certChanged = false;
+                                keyChanged = false;
+                                reloadServerSslContext();
+                            }
+                        }
+                        private void reloadServerSslContext() {
                             ((NettyRemotingServer) remotingServer).loadSslContext();
                         }
                     });
diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
index 099b027..bc68d6a 100644
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
+++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
@@ -17,12 +17,16 @@
 
 package org.apache.rocketmq.srvutil;
 
+import com.google.common.base.Strings;
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -32,23 +36,23 @@ import org.slf4j.LoggerFactory;
 public class FileWatchService extends ServiceThread {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
-    private final String [] watchFiles;
-    private final boolean [] isFileChangedFlag;
-    private final String [] fileCurrentHash;
+    private final List<String> watchFiles;
+    private final List<String> fileCurrentHash;
     private final Listener listener;
     private static final int WATCH_INTERVAL = 500;
     private MessageDigest md = MessageDigest.getInstance("MD5");
 
-    public FileWatchService(final String [] watchFiles,
+    public FileWatchService(final String[] watchFiles,
         final Listener listener) throws Exception {
-        this.watchFiles = watchFiles;
         this.listener = listener;
-        this.isFileChangedFlag = new boolean[watchFiles.length];
-        this.fileCurrentHash = new String[watchFiles.length];
+        this.watchFiles = new ArrayList<>();
+        this.fileCurrentHash = new ArrayList<>();
 
         for (int i = 0; i < watchFiles.length; i++) {
-            isFileChangedFlag[i] = false;
-            fileCurrentHash[i] = hash(watchFiles[i]);
+            if (!Strings.isNullOrEmpty(watchFiles[i]) && new File(watchFiles[i]).exists()) {
+                this.watchFiles.add(watchFiles[i]);
+                this.fileCurrentHash.add(hash(watchFiles[i]));
+            }
         }
     }
 
@@ -65,20 +69,17 @@ public class FileWatchService extends ServiceThread {
             try {
                 this.waitForRunning(WATCH_INTERVAL);
 
-                boolean allFileChanged = true;
-                for (int i = 0; i < watchFiles.length; i++) {
-                    String newHash = hash(watchFiles[i]);
-                    if (!newHash.equals(fileCurrentHash[i])) {
-                        isFileChangedFlag[i] = true;
-                        fileCurrentHash[i] = newHash;
+                for (int i = 0; i < watchFiles.size(); i++) {
+                    String newHash;
+                    try {
+                        newHash = hash(watchFiles.get(i));
+                    } catch (Exception ignored) {
+                        log.warn(this.getServiceName() + " service has exception when calculate the file hash. ", ignored);
+                        continue;
                     }
-                    allFileChanged = allFileChanged && isFileChangedFlag[i];
-                }
-
-                if (allFileChanged) {
-                    listener.onChanged();
-                    for (int i = 0; i < isFileChangedFlag.length; i++) {
-                        isFileChangedFlag[i] = false;
+                    if (!newHash.equals(fileCurrentHash.get(i))) {
+                        fileCurrentHash.set(i, newHash);
+                        listener.onChanged(watchFiles.get(i));
                     }
                 }
             } catch (Exception e) {
@@ -98,7 +99,8 @@ public class FileWatchService extends ServiceThread {
     public interface Listener {
         /**
          * Will be called when the target files are changed
+         * @param path the changed file path
          */
-        void onChanged();
+        void onChanged(String path);
     }
 }
diff --git a/srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java b/srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
index 6b411db..791abcf 100644
--- a/srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
+++ b/srvutil/src/main/test/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.rocketmq.srvutil;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.nio.file.NoSuchFileException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import org.junit.Rule;
@@ -30,7 +29,6 @@ import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Java6Assertions.failBecauseExceptionWasNotThrown;
 
 @RunWith(MockitoJUnitRunner.class)
 public class FileWatchServiceTest {
@@ -39,11 +37,12 @@ public class FileWatchServiceTest {
 
     @Test
     public void watchSingleFile() throws Exception {
-        File file = tempFolder.newFile();
+        final File file = tempFolder.newFile();
         final Semaphore waitSemaphore = new Semaphore(0);
         FileWatchService fileWatchService = new FileWatchService(new String[] {file.getAbsolutePath()}, new FileWatchService.Listener() {
             @Override
-            public void onChanged() {
+            public void onChanged(String path) {
+                assertThat(file.getAbsolutePath()).isEqualTo(path);
                 waitSemaphore.release();
             }
         });
@@ -54,29 +53,13 @@ public class FileWatchServiceTest {
     }
 
     @Test
-    public void watchSingleFile_NotExits() throws Exception {
-        File file = tempFolder.newFile();
-        final Semaphore waitSemaphore = new Semaphore(0);
-        try {
-            FileWatchService fileWatchService = new FileWatchService(new String[] {file.getAbsolutePath() + 123}, new FileWatchService.Listener() {
-                @Override
-                public void onChanged() {
-                    waitSemaphore.release();
-                }
-            });
-            failBecauseExceptionWasNotThrown(NoSuchFileException.class);
-        } catch (Exception e) {
-            assertThat(e).isInstanceOf(NoSuchFileException.class);
-        }
-    }
-
-    @Test
     public void watchSingleFile_FileDeleted() throws Exception {
         File file = tempFolder.newFile();
         final Semaphore waitSemaphore = new Semaphore(0);
-        FileWatchService fileWatchService = new FileWatchService(new String[] {file.getAbsolutePath()}, new FileWatchService.Listener() {
+        FileWatchService fileWatchService = new FileWatchService(new String[] {file.getAbsolutePath()},
+            new FileWatchService.Listener() {
             @Override
-            public void onChanged() {
+            public void onChanged(String path) {
                 waitSemaphore.release();
             }
         });
@@ -91,22 +74,49 @@ public class FileWatchServiceTest {
     }
 
     @Test
-    public void watchTwoFiles_ModifyOne() throws Exception {
+    public void watchTwoFile_FileDeleted() throws Exception {
         File fileA = tempFolder.newFile();
         File fileB = tempFolder.newFile();
         final Semaphore waitSemaphore = new Semaphore(0);
         FileWatchService fileWatchService = new FileWatchService(
             new String[] {fileA.getAbsolutePath(), fileB.getAbsolutePath()},
             new FileWatchService.Listener() {
+                @Override
+                public void onChanged(String path) {
+                    waitSemaphore.release();
+                }
+            });
+        fileWatchService.start();
+        fileA.delete();
+        boolean result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
+        assertThat(result).isFalse();
+        modifyFile(fileB);
+        result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
+        assertThat(result).isTrue();
+        fileA.createNewFile();
+        modifyFile(fileA);
+        result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
+        assertThat(result).isTrue();
+    }
+
+    @Test
+    public void watchTwoFiles_ModifyOne() throws Exception {
+        final File fileA = tempFolder.newFile();
+        File fileB = tempFolder.newFile();
+        final Semaphore waitSemaphore = new Semaphore(0);
+        FileWatchService fileWatchService = new FileWatchService(
+            new String[] {fileA.getAbsolutePath(), fileB.getAbsolutePath()},
+            new FileWatchService.Listener() {
             @Override
-            public void onChanged() {
+            public void onChanged(String path) {
+                assertThat(path).isEqualTo(fileA.getAbsolutePath());
                 waitSemaphore.release();
             }
         });
         fileWatchService.start();
         modifyFile(fileA);
         boolean result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
-        assertThat(result).isFalse();
+        assertThat(result).isTrue();
     }
 
     @Test
@@ -118,14 +128,14 @@ public class FileWatchServiceTest {
             new String[] {fileA.getAbsolutePath(), fileB.getAbsolutePath()},
             new FileWatchService.Listener() {
                 @Override
-                public void onChanged() {
+                public void onChanged(String path) {
                     waitSemaphore.release();
                 }
             });
         fileWatchService.start();
         modifyFile(fileA);
         modifyFile(fileB);
-        boolean result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
+        boolean result = waitSemaphore.tryAcquire(2, 1000, TimeUnit.MILLISECONDS);
         assertThat(result).isTrue();
     }
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@rocketmq.apache.org" <co...@rocketmq.apache.org>'].