You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/06/14 04:59:53 UTC

[1/2] incubator-rocketmq git commit: Add TLS

Repository: incubator-rocketmq
Updated Branches:
  refs/heads/tls [created] 1cf9099e9


Add TLS


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/66b5c724
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/66b5c724
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/66b5c724

Branch: refs/heads/tls
Commit: 66b5c7241651f54f7d483f99278cda537b5e4aac
Parents: c4a3e0c
Author: Li Zhanhui <li...@gmail.com>
Authored: Tue Jun 13 17:15:54 2017 +0800
Committer: Li Zhanhui <li...@gmail.com>
Committed: Tue Jun 13 17:15:54 2017 +0800

----------------------------------------------------------------------
 .../apache/rocketmq/broker/BrokerStartup.java   |   1 +
 .../apache/rocketmq/client/ClientConfig.java    |  14 ++-
 .../client/impl/factory/MQClientInstance.java   |   1 +
 .../rocketmq/common/protocol/RequestCode.java   |   4 +-
 remoting/pom.xml                                |  18 +++
 .../rocketmq/remoting/RemotingClient.java       |  14 +--
 .../remoting/netty/FileRegionEncoder.java       |  76 ++++++++++++
 .../remoting/netty/NettyClientConfig.java       |  10 ++
 .../remoting/netty/NettyRemotingAbstract.java   |   6 +
 .../remoting/netty/NettyRemotingClient.java     |  37 ++++--
 .../remoting/netty/NettyRemotingServer.java     |  72 ++++++++++--
 .../remoting/netty/NettySystemConfig.java       |  16 ++-
 .../rocketmq/remoting/netty/SslHelper.java      | 115 +++++++++++++++++++
 .../protocol/RemotingSysRequestCode.java        |  26 +++++
 14 files changed, 385 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 85d2e3a..dbea561 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -95,6 +95,7 @@ public class BrokerStartup {
             final BrokerConfig brokerConfig = new BrokerConfig();
             final NettyServerConfig nettyServerConfig = new NettyServerConfig();
             final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+            nettyClientConfig.setUseTLS(NettySystemConfig.enableSSL);
             nettyServerConfig.setListenPort(10911);
             final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 950d756..8f255f0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -45,6 +45,8 @@ public class ClientConfig {
     private String unitName;
     private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
 
+    private boolean useTLS;
+
     public String buildMQClientId() {
         StringBuilder sb = new StringBuilder();
         sb.append(this.getClientIP());
@@ -92,6 +94,7 @@ public class ClientConfig {
         this.unitMode = cc.unitMode;
         this.unitName = cc.unitName;
         this.vipChannelEnabled = cc.vipChannelEnabled;
+        this.useTLS = cc.useTLS;
     }
 
     public ClientConfig cloneClientConfig() {
@@ -106,6 +109,7 @@ public class ClientConfig {
         cc.unitMode = unitMode;
         cc.unitName = unitName;
         cc.vipChannelEnabled = vipChannelEnabled;
+        cc.useTLS = useTLS;
         return cc;
     }
 
@@ -173,12 +177,20 @@ public class ClientConfig {
         this.vipChannelEnabled = vipChannelEnabled;
     }
 
+    public boolean isUseTLS() {
+        return useTLS;
+    }
+
+    public void setUseTLS(boolean useTLS) {
+        this.useTLS = useTLS;
+    }
+
     @Override
     public String toString() {
         return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
             + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
             + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
             + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
-            + vipChannelEnabled + "]";
+            + vipChannelEnabled + ", useTLS=" + useTLS + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index f146be9..463b2ce 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -127,6 +127,7 @@ public class MQClientInstance {
         this.instanceIndex = instanceIndex;
         this.nettyClientConfig = new NettyClientConfig();
         this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
+        this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
         this.clientRemotingProcessor = new ClientRemotingProcessor(this);
         this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 6f132f7..e8d87b7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -17,7 +17,9 @@
 
 package org.apache.rocketmq.common.protocol;
 
-public class RequestCode {
+import org.apache.rocketmq.remoting.protocol.RemotingSysRequestCode;
+
+public class RequestCode extends RemotingSysRequestCode {
 
     public static final int SEND_MESSAGE = 10;
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/pom.xml
----------------------------------------------------------------------
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 1552341..413b13d 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -45,5 +45,23 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-tcnative-boringssl-static</artifactId>
+            <version>2.0.0.Final</version>
+            <classifier>${os.detected.classifier}</classifier>
+            <optional>true</optional>
+        </dependency>
     </dependencies>
+
+    <build>
+        <extensions>
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>1.4.0.Final</version>
+            </extension>
+        </extensions>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index 276a565..b527408 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -27,24 +27,24 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public interface RemotingClient extends RemotingService {
 
-    public void updateNameServerAddressList(final List<String> addrs);
+    void updateNameServerAddressList(final List<String> addrs);
 
-    public List<String> getNameServerAddressList();
+    List<String> getNameServerAddressList();
 
-    public RemotingCommand invokeSync(final String addr, final RemotingCommand request,
+    RemotingCommand invokeSync(final String addr, final RemotingCommand request,
         final long timeoutMillis) throws InterruptedException, RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException;
 
-    public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
+    void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
         final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
         RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
 
-    public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
+    void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
         throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
         RemotingTimeoutException, RemotingSendRequestException;
 
-    public void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
+    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
         final ExecutorService executor);
 
-    public boolean isChannelWriteable(final String addr);
+    boolean isChannelWritable(final String addr);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
new file mode 100644
index 0000000..c7e5af4
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.FileRegion;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import io.netty.handler.ssl.SslHandler;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * <p>
+ *     By default, file region are directly transferred to socket channel which is known as zero copy. In case we need
+ *     to encrypt transmission, data being sent should go through the {@link SslHandler}. This encoder ensures this
+ *     process.
+ * </p>
+ */
+public class FileRegionEncoder extends MessageToByteEncoder<FileRegion> {
+
+    /**
+     * Encode a message into a {@link io.netty.buffer.ByteBuf}. This method will be called for each written message that
+     * can be handled by this encoder.
+     *
+     * @param ctx the {@link io.netty.channel.ChannelHandlerContext} which this {@link
+     * io.netty.handler.codec.MessageToByteEncoder} belongs to
+     * @param msg the message to encode
+     * @param out the {@link io.netty.buffer.ByteBuf} into which the encoded message will be written
+     * @throws Exception is thrown if an error occurs
+     */
+    @Override
+    protected void encode(ChannelHandlerContext ctx, FileRegion msg, final ByteBuf out) throws Exception {
+        WritableByteChannel writableByteChannel = new WritableByteChannel() {
+            @Override
+            public int write(ByteBuffer src) throws IOException {
+                out.writeBytes(src);
+                return out.capacity();
+            }
+
+            @Override
+            public boolean isOpen() {
+                return true;
+            }
+
+            @Override
+            public void close() throws IOException {
+            }
+        };
+
+        while (true) {
+            long position = msg.transfered();
+            msg.transferTo(writableByteChannel, position);
+            if (msg.count() == 0) {
+                break;
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index 9edaa54..fbc071b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -38,6 +38,8 @@ public class NettyClientConfig {
     private boolean clientPooledByteBufAllocatorEnable = false;
     private boolean clientCloseSocketIfTimeout = false;
 
+    private boolean useTLS;
+
     public boolean isClientCloseSocketIfTimeout() {
         return clientCloseSocketIfTimeout;
     }
@@ -125,4 +127,12 @@ public class NettyClientConfig {
     public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) {
         this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable;
     }
+
+    public boolean isUseTLS() {
+        return useTLS;
+    }
+
+    public void setUseTLS(boolean useTLS) {
+        this.useTLS = useTLS;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 0ba714a..73fcee0 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -20,6 +20,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.ssl.SslContext;
 import java.net.SocketAddress;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -89,6 +90,11 @@ public abstract class NettyRemotingAbstract {
     protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
 
     /**
+     * SSL Context.
+     */
+    protected SslContext sslContext;
+
+    /**
      * Constructor, specifying capacity of one-way and asynchronous semaphores.
      * @param permitsOneway Number of permits for one-way requests.
      * @param permitsAsync Number of permits for asynchronous requests.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 1c3da9a..9f2d062 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
@@ -34,6 +35,7 @@ import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import java.net.SocketAddress;
+import java.security.cert.CertificateException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import javax.net.ssl.SSLException;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -120,6 +123,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                 return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
             }
         });
+
+        if (NettySystemConfig.enableSSL) {
+            try {
+                sslContext = SslHelper.buildSslContext(true);
+                log.info("SSL enabled for client");
+            } catch (SSLException e) {
+                log.error("Failed to create SSLContext", e);
+            } catch (CertificateException e) {
+                log.error("Failed to create SSLContext", e);
+                throw new RuntimeException("Failed to create SSLContext", e);
+            }
+        }
     }
 
     private static int initValueIndex() {
@@ -151,7 +166,12 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
-                    ch.pipeline().addLast(
+                    ChannelPipeline pipeline = ch.pipeline();
+                    if (nettyClientConfig.isUseTLS() && null != sslContext) {
+                        pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
+                        log.info("Prepend SSL handler");
+                    }
+                    pipeline.addLast(
                         defaultEventExecutorGroup,
                         new NettyEncoder(),
                         new NettyDecoder(),
@@ -421,17 +441,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
     private Channel createChannel(final String addr) throws InterruptedException {
         ChannelWrapper cw = this.channelTables.get(addr);
         if (cw != null && cw.isOK()) {
-            return cw.getChannel();
+            cw.getChannel().close();
+            channelTables.remove(addr);
         }
 
         if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
             try {
-                boolean createNewConnection = false;
+                boolean createNewConnection;
                 cw = this.channelTables.get(addr);
                 if (cw != null) {
 
                     if (cw.isOK()) {
-                        return cw.getChannel();
+                        cw.getChannel().close();
+                        this.channelTables.remove(addr);
+                        createNewConnection = true;
                     } else if (!cw.getChannelFuture().isDone()) {
                         createNewConnection = false;
                     } else {
@@ -530,10 +553,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
     }
 
     @Override
-    public boolean isChannelWriteable(String addr) {
+    public boolean isChannelWritable(String addr) {
         ChannelWrapper cw = this.channelTables.get(addr);
         if (cw != null && cw.isOK()) {
-            return cw.isWriteable();
+            return cw.isWritable();
         }
         return true;
     }
@@ -569,7 +592,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
         }
 
-        public boolean isWriteable() {
+        public boolean isWritable() {
             return this.channelFuture.channel().isWritable();
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index a9a55ab..70e5bae 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.remoting.netty;
 
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelDuplexHandler;
@@ -37,12 +38,14 @@ import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import java.net.InetSocketAddress;
+import java.security.cert.CertificateException;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.net.ssl.SSLException;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -74,6 +77,10 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
 
     private int port = 0;
 
+    private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+    private static final String TLS_HANDLER_NAME = "sslHandler";
+    private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
+
     public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
         this(nettyServerConfig, null);
     }
@@ -128,6 +135,19 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
                 }
             });
         }
+
+        if (NettySystemConfig.enableSSL) {
+            try {
+                sslContext = SslHelper.buildSslContext(false);
+                log.info("SSL enabled for server");
+            } catch (CertificateException e) {
+                log.error("Failed to create SSLContext for server", e);
+                throw new RuntimeException(e);
+            } catch (SSLException e) {
+                log.error("Failed to create SSLContext for server", e);
+                throw new RuntimeException(e);
+            }
+        }
     }
 
     private boolean useEpoll() {
@@ -163,13 +183,15 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
-                        ch.pipeline().addLast(
-                            defaultEventExecutorGroup,
-                            new NettyEncoder(),
-                            new NettyDecoder(),
-                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
-                            new NettyConnectManageHandler(),
-                            new NettyServerHandler());
+                        ch.pipeline()
+                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
+                            .addLast(defaultEventExecutorGroup,
+                                new NettyEncoder(),
+                                new NettyDecoder(),
+                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+                                new NettyConnectManageHandler(),
+                                new NettyServerHandler()
+                            );
                     }
                 });
 
@@ -297,6 +319,42 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
         return this.publicExecutor;
     }
 
+    class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+        private static final byte HANDSHAKE_MAGIC_CODE = 0x16;
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+
+            // mark the current position so that we can peek the first byte to determine if the content is starting with
+            // TLS handshake
+            msg.markReaderIndex();
+
+            byte b = msg.getByte(0);
+
+            if (b == HANDSHAKE_MAGIC_CODE) {
+                if (null != sslContext) {
+                    ctx.pipeline()
+                        .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
+                        .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
+                    log.info("SSL handler prepended to channel pipeline");
+                } else {
+                    ctx.close();
+                    log.error("Requiring SSL handler but sslContext is being null");
+                }
+            }
+
+            // reset the reader index so that handshake negotiation may proceed as normal.
+            msg.resetReaderIndex();
+
+            // Remove this handler
+            ctx.pipeline().remove(HANDSHAKE_HANDLER_NAME);
+
+            // Hand over this message to the next .
+            ctx.fireChannelRead(msg.retain());
+        }
+    }
+
     class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
index 52556fc..4a071c5 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
@@ -28,9 +28,15 @@ public class NettySystemConfig {
         "com.rocketmq.remoting.clientAsyncSemaphoreValue";
     public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = //
         "com.rocketmq.remoting.clientOnewaySemaphoreValue";
+
+    public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_ENABLE = //
+        "org.apache.rocketmq.remoting.ssl.enable";
+
+    public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE = //
+        "org.apache.rocketmq.remoting.ssl.config.file";
+
     public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
-        Boolean
-            .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
+        Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
     public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
         Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
     public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = //
@@ -39,4 +45,10 @@ public class NettySystemConfig {
         Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
     public static int socketRcvbufSize = //
         Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
+
+    public static boolean enableSSL = //
+        Boolean.parseBoolean(System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_ENABLE, "true"));
+
+    public static String sslConfigFile = //
+        System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE, "/etc/rocketmq/ssl.properties");
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
new file mode 100644
index 0000000..95bcdc4
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.cert.CertificateException;
+import java.util.Properties;
+import javax.net.ssl.SSLException;
+
+public class SslHelper {
+
+    public static SslContext buildSslContext(boolean forClient) throws SSLException, CertificateException {
+
+        File configFile = new File(NettySystemConfig.sslConfigFile);
+        boolean testMode = !(configFile.exists() && configFile.isFile() && configFile.canRead());
+        Properties properties = null;
+
+        if (!testMode) {
+            properties = new Properties();
+            InputStream inputStream = null;
+            try {
+                inputStream = new FileInputStream(configFile);
+                properties.load(inputStream);
+            } catch (FileNotFoundException ignore) {
+            } catch (IOException ignore) {
+            } finally {
+                if (null != inputStream) {
+                    try {
+                        inputStream.close();
+                    } catch (IOException ignore) {
+                    }
+                }
+            }
+        }
+
+        SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
+
+        if (forClient) {
+            if (testMode) {
+                return SslContextBuilder
+                    .forClient()
+                    .sslProvider(SslProvider.JDK)
+                    .trustManager(InsecureTrustManagerFactory.INSTANCE)
+                    .build();
+            } else {
+                return SslContextBuilder.forClient()
+                    .sslProvider(provider)
+                    .trustManager(new File(properties.getProperty("trustManager")))
+                    .keyManager(
+                        properties.containsKey("client.keyCertChainFile") ? new File(properties.getProperty("client.keyCertChainFile")) : null,
+                        properties.containsKey("client.keyFile") ? new File(properties.getProperty("client.key")) : null,
+                        properties.containsKey("client.password") ? properties.getProperty("client.password") : null)
+                    .build();
+            }
+        } else {
+
+            if (testMode) {
+                SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
+                return SslContextBuilder
+                    .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
+                    .sslProvider(SslProvider.JDK)
+                    .clientAuth(ClientAuth.OPTIONAL)
+                    .build();
+            } else {
+                return SslContextBuilder.forServer(
+                    properties.containsKey("server.keyCertChainFile") ? new File(properties.getProperty("server.keyCertChainFile")) : null,
+                    properties.containsKey("server.keyFile") ? new File(properties.getProperty("server.key")) : null,
+                    properties.containsKey("server.password") ? properties.getProperty("server.password") : null)
+                    .sslProvider(provider)
+                    .trustManager(new File(properties.getProperty("server.trustManager")))
+                    .clientAuth(parseClientAuthMode(properties.getProperty("server.auth.client")))
+                    .build();
+            }
+        }
+    }
+
+    private static ClientAuth parseClientAuthMode(String authMode) {
+        if (null == authMode || authMode.trim().isEmpty()) {
+            return ClientAuth.NONE;
+        }
+
+        if ("optional".equalsIgnoreCase(authMode)) {
+            return ClientAuth.OPTIONAL;
+        }
+
+        return ClientAuth.REQUIRE;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysRequestCode.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysRequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysRequestCode.java
new file mode 100644
index 0000000..32783fa
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysRequestCode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol;
+
+public class RemotingSysRequestCode {
+
+    /**
+     * Request to negotiate upgrading connection to TLS
+     */
+    public static final int START_TLS = 1;
+}


[2/2] incubator-rocketmq git commit: Fix typo

Posted by li...@apache.org.
Fix typo


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/1cf9099e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/1cf9099e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/1cf9099e

Branch: refs/heads/tls
Commit: 1cf9099e975860eb11408b1c69f3589c6b78abf7
Parents: 66b5c72
Author: Li Zhanhui <li...@gmail.com>
Authored: Wed Jun 14 12:59:41 2017 +0800
Committer: Li Zhanhui <li...@gmail.com>
Committed: Wed Jun 14 12:59:41 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/rocketmq/remoting/netty/SslHelper.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1cf9099e/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
index 95bcdc4..bdf10bf 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
@@ -71,10 +71,10 @@ public class SslHelper {
             } else {
                 return SslContextBuilder.forClient()
                     .sslProvider(provider)
-                    .trustManager(new File(properties.getProperty("trustManager")))
+                    .trustManager(new File(properties.getProperty("client.trustManager")))
                     .keyManager(
                         properties.containsKey("client.keyCertChainFile") ? new File(properties.getProperty("client.keyCertChainFile")) : null,
-                        properties.containsKey("client.keyFile") ? new File(properties.getProperty("client.key")) : null,
+                        properties.containsKey("client.keyFile") ? new File(properties.getProperty("client.keyFile")) : null,
                         properties.containsKey("client.password") ? properties.getProperty("client.password") : null)
                     .build();
             }
@@ -90,7 +90,7 @@ public class SslHelper {
             } else {
                 return SslContextBuilder.forServer(
                     properties.containsKey("server.keyCertChainFile") ? new File(properties.getProperty("server.keyCertChainFile")) : null,
-                    properties.containsKey("server.keyFile") ? new File(properties.getProperty("server.key")) : null,
+                    properties.containsKey("server.keyFile") ? new File(properties.getProperty("server.keyFile")) : null,
                     properties.containsKey("server.password") ? properties.getProperty("server.password") : null)
                     .sslProvider(provider)
                     .trustManager(new File(properties.getProperty("server.trustManager")))