You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/06/14 04:59:53 UTC
[1/2] incubator-rocketmq git commit: Add TLS
Repository: incubator-rocketmq
Updated Branches:
refs/heads/tls [created] 1cf9099e9
Add TLS
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/66b5c724
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/66b5c724
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/66b5c724
Branch: refs/heads/tls
Commit: 66b5c7241651f54f7d483f99278cda537b5e4aac
Parents: c4a3e0c
Author: Li Zhanhui <li...@gmail.com>
Authored: Tue Jun 13 17:15:54 2017 +0800
Committer: Li Zhanhui <li...@gmail.com>
Committed: Tue Jun 13 17:15:54 2017 +0800
----------------------------------------------------------------------
.../apache/rocketmq/broker/BrokerStartup.java | 1 +
.../apache/rocketmq/client/ClientConfig.java | 14 ++-
.../client/impl/factory/MQClientInstance.java | 1 +
.../rocketmq/common/protocol/RequestCode.java | 4 +-
remoting/pom.xml | 18 +++
.../rocketmq/remoting/RemotingClient.java | 14 +--
.../remoting/netty/FileRegionEncoder.java | 76 ++++++++++++
.../remoting/netty/NettyClientConfig.java | 10 ++
.../remoting/netty/NettyRemotingAbstract.java | 6 +
.../remoting/netty/NettyRemotingClient.java | 37 ++++--
.../remoting/netty/NettyRemotingServer.java | 72 ++++++++++--
.../remoting/netty/NettySystemConfig.java | 16 ++-
.../rocketmq/remoting/netty/SslHelper.java | 115 +++++++++++++++++++
.../protocol/RemotingSysRequestCode.java | 26 +++++
14 files changed, 385 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 85d2e3a..dbea561 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -95,6 +95,7 @@ public class BrokerStartup {
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ nettyClientConfig.setUseTLS(NettySystemConfig.enableSSL);
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 950d756..8f255f0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -45,6 +45,8 @@ public class ClientConfig {
private String unitName;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
+ private boolean useTLS;
+
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
@@ -92,6 +94,7 @@ public class ClientConfig {
this.unitMode = cc.unitMode;
this.unitName = cc.unitName;
this.vipChannelEnabled = cc.vipChannelEnabled;
+ this.useTLS = cc.useTLS;
}
public ClientConfig cloneClientConfig() {
@@ -106,6 +109,7 @@ public class ClientConfig {
cc.unitMode = unitMode;
cc.unitName = unitName;
cc.vipChannelEnabled = vipChannelEnabled;
+ cc.useTLS = useTLS;
return cc;
}
@@ -173,12 +177,20 @@ public class ClientConfig {
this.vipChannelEnabled = vipChannelEnabled;
}
+ public boolean isUseTLS() {
+ return useTLS;
+ }
+
+ public void setUseTLS(boolean useTLS) {
+ this.useTLS = useTLS;
+ }
+
@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
- + vipChannelEnabled + "]";
+ + vipChannelEnabled + ", useTLS=" + useTLS + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index f146be9..463b2ce 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -127,6 +127,7 @@ public class MQClientInstance {
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
+ this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 6f132f7..e8d87b7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -17,7 +17,9 @@
package org.apache.rocketmq.common.protocol;
-public class RequestCode {
+import org.apache.rocketmq.remoting.protocol.RemotingSysRequestCode;
+
+public class RequestCode extends RemotingSysRequestCode {
public static final int SEND_MESSAGE = 10;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/pom.xml
----------------------------------------------------------------------
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 1552341..413b13d 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -45,5 +45,23 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative-boringssl-static</artifactId>
+ <version>2.0.0.Final</version>
+ <classifier>${os.detected.classifier}</classifier>
+ <optional>true</optional>
+ </dependency>
</dependencies>
+
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.4.0.Final</version>
+ </extension>
+ </extensions>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index 276a565..b527408 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -27,24 +27,24 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingClient extends RemotingService {
- public void updateNameServerAddressList(final List<String> addrs);
+ void updateNameServerAddressList(final List<String> addrs);
- public List<String> getNameServerAddressList();
+ List<String> getNameServerAddressList();
- public RemotingCommand invokeSync(final String addr, final RemotingCommand request,
+ RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
- public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
+ void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
- public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
+ void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
- public void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
+ void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
- public boolean isChannelWriteable(final String addr);
+ boolean isChannelWritable(final String addr);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
new file mode 100644
index 0000000..c7e5af4
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.FileRegion;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import io.netty.handler.ssl.SslHandler;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * <p>
+ * By default, file region are directly transferred to socket channel which is known as zero copy. In case we need
+ * to encrypt transmission, data being sent should go through the {@link SslHandler}. This encoder ensures this
+ * process.
+ * </p>
+ */
+public class FileRegionEncoder extends MessageToByteEncoder<FileRegion> {
+
+ /**
+ * Encode a message into a {@link io.netty.buffer.ByteBuf}. This method will be called for each written message that
+ * can be handled by this encoder.
+ *
+ * @param ctx the {@link io.netty.channel.ChannelHandlerContext} which this {@link
+ * io.netty.handler.codec.MessageToByteEncoder} belongs to
+ * @param msg the message to encode
+ * @param out the {@link io.netty.buffer.ByteBuf} into which the encoded message will be written
+ * @throws Exception is thrown if an error occurs
+ */
+ @Override
+ protected void encode(ChannelHandlerContext ctx, FileRegion msg, final ByteBuf out) throws Exception {
+ WritableByteChannel writableByteChannel = new WritableByteChannel() {
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ out.writeBytes(src);
+ return out.capacity();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ };
+
+ while (true) {
+ long position = msg.transfered();
+ msg.transferTo(writableByteChannel, position);
+ if (msg.count() == 0) {
+ break;
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index 9edaa54..fbc071b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -38,6 +38,8 @@ public class NettyClientConfig {
private boolean clientPooledByteBufAllocatorEnable = false;
private boolean clientCloseSocketIfTimeout = false;
+ private boolean useTLS;
+
public boolean isClientCloseSocketIfTimeout() {
return clientCloseSocketIfTimeout;
}
@@ -125,4 +127,12 @@ public class NettyClientConfig {
public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) {
this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable;
}
+
+ public boolean isUseTLS() {
+ return useTLS;
+ }
+
+ public void setUseTLS(boolean useTLS) {
+ this.useTLS = useTLS;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 0ba714a..73fcee0 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -20,6 +20,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.ssl.SslContext;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Iterator;
@@ -89,6 +90,11 @@ public abstract class NettyRemotingAbstract {
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
/**
+ * SSL Context.
+ */
+ protected SslContext sslContext;
+
+ /**
* Constructor, specifying capacity of one-way and asynchronous semaphores.
* @param permitsOneway Number of permits for one-way requests.
* @param permitsAsync Number of permits for asynchronous requests.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 1c3da9a..9f2d062 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
@@ -34,6 +35,7 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.net.SocketAddress;
+import java.security.cert.CertificateException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -50,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import javax.net.ssl.SSLException;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
@@ -120,6 +123,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
+
+ if (NettySystemConfig.enableSSL) {
+ try {
+ sslContext = SslHelper.buildSslContext(true);
+ log.info("SSL enabled for client");
+ } catch (SSLException e) {
+ log.error("Failed to create SSLContext", e);
+ } catch (CertificateException e) {
+ log.error("Failed to create SSLContext", e);
+ throw new RuntimeException("Failed to create SSLContext", e);
+ }
+ }
}
private static int initValueIndex() {
@@ -151,7 +166,12 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(
+ ChannelPipeline pipeline = ch.pipeline();
+ if (nettyClientConfig.isUseTLS() && null != sslContext) {
+ pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
+ log.info("Prepend SSL handler");
+ }
+ pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
@@ -421,17 +441,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private Channel createChannel(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
- return cw.getChannel();
+ cw.getChannel().close();
+ channelTables.remove(addr);
}
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
- boolean createNewConnection = false;
+ boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
if (cw.isOK()) {
- return cw.getChannel();
+ cw.getChannel().close();
+ this.channelTables.remove(addr);
+ createNewConnection = true;
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
@@ -530,10 +553,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
@Override
- public boolean isChannelWriteable(String addr) {
+ public boolean isChannelWritable(String addr) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
- return cw.isWriteable();
+ return cw.isWritable();
}
return true;
}
@@ -569,7 +592,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
}
- public boolean isWriteable() {
+ public boolean isWritable() {
return this.channelFuture.channel().isWritable();
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index a9a55ab..70e5bae 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.remoting.netty;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
@@ -37,12 +38,14 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.net.InetSocketAddress;
+import java.security.cert.CertificateException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.net.ssl.SSLException;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
@@ -74,6 +77,10 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private int port = 0;
+ private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+ private static final String TLS_HANDLER_NAME = "sslHandler";
+ private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
+
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
@@ -128,6 +135,19 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
});
}
+
+ if (NettySystemConfig.enableSSL) {
+ try {
+ sslContext = SslHelper.buildSslContext(false);
+ log.info("SSL enabled for server");
+ } catch (CertificateException e) {
+ log.error("Failed to create SSLContext for server", e);
+ throw new RuntimeException(e);
+ } catch (SSLException e) {
+ log.error("Failed to create SSLContext for server", e);
+ throw new RuntimeException(e);
+ }
+ }
}
private boolean useEpoll() {
@@ -163,13 +183,15 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(
- defaultEventExecutorGroup,
- new NettyEncoder(),
- new NettyDecoder(),
- new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
- new NettyConnectManageHandler(),
- new NettyServerHandler());
+ ch.pipeline()
+ .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
+ .addLast(defaultEventExecutorGroup,
+ new NettyEncoder(),
+ new NettyDecoder(),
+ new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+ new NettyConnectManageHandler(),
+ new NettyServerHandler()
+ );
}
});
@@ -297,6 +319,42 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
return this.publicExecutor;
}
+ class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+ private static final byte HANDSHAKE_MAGIC_CODE = 0x16;
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+
+ // mark the current position so that we can peek the first byte to determine if the content is starting with
+ // TLS handshake
+ msg.markReaderIndex();
+
+ byte b = msg.getByte(0);
+
+ if (b == HANDSHAKE_MAGIC_CODE) {
+ if (null != sslContext) {
+ ctx.pipeline()
+ .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
+ .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
+ log.info("SSL handler prepended to channel pipeline");
+ } else {
+ ctx.close();
+ log.error("Requiring SSL handler but sslContext is being null");
+ }
+ }
+
+ // reset the reader index so that handshake negotiation may proceed as normal.
+ msg.resetReaderIndex();
+
+ // Remove this handler
+ ctx.pipeline().remove(HANDSHAKE_HANDLER_NAME);
+
+ // Hand over this message to the next .
+ ctx.fireChannelRead(msg.retain());
+ }
+ }
+
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
index 52556fc..4a071c5 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
@@ -28,9 +28,15 @@ public class NettySystemConfig {
"com.rocketmq.remoting.clientAsyncSemaphoreValue";
public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = //
"com.rocketmq.remoting.clientOnewaySemaphoreValue";
+
+ public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_ENABLE = //
+ "org.apache.rocketmq.remoting.ssl.enable";
+
+ public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE = //
+ "org.apache.rocketmq.remoting.ssl.config.file";
+
public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
- Boolean
- .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
+ Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = //
@@ -39,4 +45,10 @@ public class NettySystemConfig {
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
public static int socketRcvbufSize = //
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
+
+ public static boolean enableSSL = //
+ Boolean.parseBoolean(System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_ENABLE, "true"));
+
+ public static String sslConfigFile = //
+ System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE, "/etc/rocketmq/ssl.properties");
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
new file mode 100644
index 0000000..95bcdc4
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.cert.CertificateException;
+import java.util.Properties;
+import javax.net.ssl.SSLException;
+
+public class SslHelper {
+
+ public static SslContext buildSslContext(boolean forClient) throws SSLException, CertificateException {
+
+ File configFile = new File(NettySystemConfig.sslConfigFile);
+ boolean testMode = !(configFile.exists() && configFile.isFile() && configFile.canRead());
+ Properties properties = null;
+
+ if (!testMode) {
+ properties = new Properties();
+ InputStream inputStream = null;
+ try {
+ inputStream = new FileInputStream(configFile);
+ properties.load(inputStream);
+ } catch (FileNotFoundException ignore) {
+ } catch (IOException ignore) {
+ } finally {
+ if (null != inputStream) {
+ try {
+ inputStream.close();
+ } catch (IOException ignore) {
+ }
+ }
+ }
+ }
+
+ SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
+
+ if (forClient) {
+ if (testMode) {
+ return SslContextBuilder
+ .forClient()
+ .sslProvider(SslProvider.JDK)
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .build();
+ } else {
+ return SslContextBuilder.forClient()
+ .sslProvider(provider)
+ .trustManager(new File(properties.getProperty("trustManager")))
+ .keyManager(
+ properties.containsKey("client.keyCertChainFile") ? new File(properties.getProperty("client.keyCertChainFile")) : null,
+ properties.containsKey("client.keyFile") ? new File(properties.getProperty("client.key")) : null,
+ properties.containsKey("client.password") ? properties.getProperty("client.password") : null)
+ .build();
+ }
+ } else {
+
+ if (testMode) {
+ SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
+ return SslContextBuilder
+ .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
+ .sslProvider(SslProvider.JDK)
+ .clientAuth(ClientAuth.OPTIONAL)
+ .build();
+ } else {
+ return SslContextBuilder.forServer(
+ properties.containsKey("server.keyCertChainFile") ? new File(properties.getProperty("server.keyCertChainFile")) : null,
+ properties.containsKey("server.keyFile") ? new File(properties.getProperty("server.key")) : null,
+ properties.containsKey("server.password") ? properties.getProperty("server.password") : null)
+ .sslProvider(provider)
+ .trustManager(new File(properties.getProperty("server.trustManager")))
+ .clientAuth(parseClientAuthMode(properties.getProperty("server.auth.client")))
+ .build();
+ }
+ }
+ }
+
+ private static ClientAuth parseClientAuthMode(String authMode) {
+ if (null == authMode || authMode.trim().isEmpty()) {
+ return ClientAuth.NONE;
+ }
+
+ if ("optional".equalsIgnoreCase(authMode)) {
+ return ClientAuth.OPTIONAL;
+ }
+
+ return ClientAuth.REQUIRE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysRequestCode.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysRequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysRequestCode.java
new file mode 100644
index 0000000..32783fa
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysRequestCode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol;
+
+public class RemotingSysRequestCode {
+
+ /**
+ * Request to negotiate upgrading connection to TLS
+ */
+ public static final int START_TLS = 1;
+}
[2/2] incubator-rocketmq git commit: Fix typo
Posted by li...@apache.org.
Fix typo
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/1cf9099e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/1cf9099e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/1cf9099e
Branch: refs/heads/tls
Commit: 1cf9099e975860eb11408b1c69f3589c6b78abf7
Parents: 66b5c72
Author: Li Zhanhui <li...@gmail.com>
Authored: Wed Jun 14 12:59:41 2017 +0800
Committer: Li Zhanhui <li...@gmail.com>
Committed: Wed Jun 14 12:59:41 2017 +0800
----------------------------------------------------------------------
.../java/org/apache/rocketmq/remoting/netty/SslHelper.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1cf9099e/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
index 95bcdc4..bdf10bf 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
@@ -71,10 +71,10 @@ public class SslHelper {
} else {
return SslContextBuilder.forClient()
.sslProvider(provider)
- .trustManager(new File(properties.getProperty("trustManager")))
+ .trustManager(new File(properties.getProperty("client.trustManager")))
.keyManager(
properties.containsKey("client.keyCertChainFile") ? new File(properties.getProperty("client.keyCertChainFile")) : null,
- properties.containsKey("client.keyFile") ? new File(properties.getProperty("client.key")) : null,
+ properties.containsKey("client.keyFile") ? new File(properties.getProperty("client.keyFile")) : null,
properties.containsKey("client.password") ? properties.getProperty("client.password") : null)
.build();
}
@@ -90,7 +90,7 @@ public class SslHelper {
} else {
return SslContextBuilder.forServer(
properties.containsKey("server.keyCertChainFile") ? new File(properties.getProperty("server.keyCertChainFile")) : null,
- properties.containsKey("server.keyFile") ? new File(properties.getProperty("server.key")) : null,
+ properties.containsKey("server.keyFile") ? new File(properties.getProperty("server.keyFile")) : null,
properties.containsKey("server.password") ? properties.getProperty("server.password") : null)
.sslProvider(provider)
.trustManager(new File(properties.getProperty("server.trustManager")))