You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/19 07:23:49 UTC
[5/8] incubator-rocketmq git commit: initialize RocketMQ5
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
new file mode 100644
index 0000000..d875f95
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.SupportedCipherSuiteFilter;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingServer;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.external.ThreadUtils;
+import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
+import org.apache.rocketmq.remoting.impl.netty.handler.ChannelStatistics;
+import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector;
+import org.apache.rocketmq.remoting.internal.JvmUtils;
+
+public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
+ private final RemotingConfig serverConfig;
+
+ private final ServerBootstrap serverBootstrap;
+ private final EventLoopGroup bossGroup;
+ private final EventLoopGroup ioGroup;
+ private EventExecutorGroup workerGroup;
+ private Class<? extends ServerSocketChannel> socketChannelClass;
+
+ private int port;
+ private SslContext sslContext;
+
+ NettyRemotingServer(final RemotingConfig serverConfig) {
+ super(serverConfig);
+
+ this.serverBootstrap = new ServerBootstrap();
+ this.serverConfig = serverConfig;
+
+ if (JvmUtils.isLinux() && this.serverConfig.isServerNativeEpollEnable()) {
+ this.ioGroup = new EpollEventLoopGroup(serverConfig.getServerIoThreads(), ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads",
+ serverConfig.getServerIoThreads()));
+
+ this.bossGroup = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
+ serverConfig.getServerAcceptorThreads()));
+
+ this.socketChannelClass = EpollServerSocketChannel.class;
+ } else {
+ this.bossGroup = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
+ serverConfig.getServerAcceptorThreads()));
+
+ this.ioGroup = new NioEventLoopGroup(serverConfig.getServerIoThreads(), ThreadUtils.newGenericThreadFactory("NettyNioIoThreads",
+ serverConfig.getServerIoThreads()));
+
+ this.socketChannelClass = NioServerSocketChannel.class;
+ }
+
+ this.workerGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
+ ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
+
+ buildHttp2SslContext();
+ }
+
+ private void buildHttp2SslContext() {
+ try {
+ SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
+ SelfSignedCertificate ssc;
+ //NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
+ //Please refer to the HTTP/2 specification for cipher requirements.
+ ssc = new SelfSignedCertificate();
+ sslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
+ .sslProvider(provider)
+ .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE).build();
+ } catch (Exception e) {
+ LOG.error("Can not build SSL context !", e);
+ }
+ }
+
+ private void applyOptions(ServerBootstrap bootstrap) {
+ //option() is for the NioServerSocketChannel that accepts incoming connections.
+ //childOption() is for the Channels accepted by the parent ServerChannel, which is NioServerSocketChannel in this case
+ if (null != serverConfig) {
+ if (serverConfig.getTcpSoBacklogSize() > 0) {
+ bootstrap.option(ChannelOption.SO_BACKLOG, serverConfig.getTcpSoBacklogSize());
+ }
+
+ if (serverConfig.getTcpSoLinger() > 0) {
+ bootstrap.option(ChannelOption.SO_LINGER, serverConfig.getTcpSoLinger());
+ }
+
+ if (serverConfig.getTcpSoSndBufSize() > 0) {
+ bootstrap.childOption(ChannelOption.SO_SNDBUF, serverConfig.getTcpSoSndBufSize());
+ }
+ if (serverConfig.getTcpSoRcvBufSize() > 0) {
+ bootstrap.childOption(ChannelOption.SO_RCVBUF, serverConfig.getTcpSoRcvBufSize());
+ }
+
+ bootstrap.option(ChannelOption.SO_REUSEADDR, serverConfig.isTcpSoReuseAddress()).
+ childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isTcpSoKeepAlive()).
+ childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpSoNoDelay()).
+ option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeout());
+ }
+
+ if (serverConfig.isServerPooledBytebufAllocatorEnable()) {
+ bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ }
+ }
+
+ @Override
+ public void start() {
+ super.start();
+
+ final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ this.serverBootstrap.group(this.bossGroup, this.ioGroup).
+ channel(socketChannelClass).childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ channels.add(ch);
+
+ ChannelPipeline cp = ch.pipeline();
+
+ cp.addLast(ChannelStatistics.NAME, new ChannelStatistics(channels));
+
+ cp.addFirst(ProtocolSelector.NAME, new ProtocolSelector(sslContext));
+ cp.addLast(workerGroup, new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(),
+ serverConfig.getConnectionChannelWriterIdleSeconds(),
+ serverConfig.getConnectionChannelIdleSeconds()),
+ new ServerConnectionHandler(),
+ new EventDispatcher());
+ }
+ });
+
+ applyOptions(serverBootstrap);
+
+ ChannelFuture channelFuture = this.serverBootstrap.bind(this.serverConfig.getServerListenPort()).syncUninterruptibly();
+ this.port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
+
+ startUpHouseKeepingService();
+ }
+
+ @Override
+ public void stop() {
+ try {
+
+ ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
+
+ ThreadUtils.shutdownGracefully(channelEventExecutor);
+
+ this.bossGroup.shutdownGracefully().syncUninterruptibly();
+
+ this.ioGroup.shutdownGracefully().syncUninterruptibly();
+
+ this.workerGroup.shutdownGracefully().syncUninterruptibly();
+ } catch (Exception e) {
+ LOG.error("RemotingServer stopped error !", e);
+ }
+
+ super.stop();
+ }
+
+ @Override
+ public int localListenPort() {
+ return this.port;
+ }
+
+ @Override
+ public RemotingCommand invoke(final RemotingChannel remotingChannel, final RemotingCommand request,
+ final long timeoutMillis) {
+ return invokeWithInterceptor(((NettyChannelImpl) remotingChannel).getChannel(), request, timeoutMillis);
+ }
+
+ @Override
+ public void invokeAsync(final RemotingChannel remotingChannel, final RemotingCommand request,
+ final AsyncHandler asyncHandler,
+ final long timeoutMillis) {
+ invokeAsyncWithInterceptor(((NettyChannelImpl) remotingChannel).getChannel(), request, asyncHandler, timeoutMillis);
+ }
+
+ @Override
+ public void invokeOneWay(final RemotingChannel remotingChannel, final RemotingCommand request,
+ final long timeoutMillis) {
+ invokeOnewayWithInterceptor(((NettyChannelImpl) remotingChannel).getChannel(), request, timeoutMillis);
+ }
+
+ private class ServerConnectionHandler extends ChannelDuplexHandler {
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(),
+ ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable());
+ }
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ super.channelRegistered(ctx);
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ super.channelUnregistered(ctx);
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ super.channelActive(ctx);
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.ACTIVE, ctx.channel()));
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ super.channelInactive(ctx);
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+ }
+
+ @Override
+ public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent) {
+ final IdleStateEvent event = (IdleStateEvent) evt;
+ if (event.state().equals(IdleState.ALL_IDLE)) {
+ ctx.channel().close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ LOG.warn("Close channel {} because of event {},result is {}", ctx.channel(), event, future.isSuccess());
+ }
+ });
+
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.IDLE, ctx.channel()));
+ }
+ }
+ ctx.fireUserEventTriggered(evt);
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause));
+
+ ctx.channel().close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ LOG.warn("Close channel {} because of error {},result is {}", ctx.channel(), cause, future.isSuccess());
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/RemotingBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/RemotingBootstrapFactory.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/RemotingBootstrapFactory.java
new file mode 100644
index 0000000..4dd502c
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/RemotingBootstrapFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import java.util.Properties;
+import org.apache.rocketmq.remoting.api.RemotingClient;
+import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.internal.BeanUtils;
+import org.apache.rocketmq.remoting.internal.PropertyUtils;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Remoting Bootstrap entrance.
+ */
+public final class RemotingBootstrapFactory {
+ public static RemotingClient createRemotingClient(@NotNull final String fileName) {
+ Properties prop = PropertyUtils.loadProps(fileName);
+ RemotingConfig config = BeanUtils.populate(prop, RemotingConfig.class);
+ return new NettyRemotingClient(config);
+ }
+
+ public static RemotingClient createRemotingClient(@NotNull final RemotingConfig config) {
+ return new NettyRemotingClient(config);
+ }
+
+ public static RemotingClient createRemotingClient(@NotNull final Properties properties) {
+ RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+ return new NettyRemotingClient(config);
+ }
+
+ public static NettyRemotingServer createRemotingServer(@NotNull final String fileName) {
+ Properties prop = PropertyUtils.loadProps(fileName);
+ RemotingConfig config = BeanUtils.populate(prop, RemotingConfig.class);
+ return new NettyRemotingServer(config);
+ }
+
+ public static NettyRemotingServer createRemotingServer(@NotNull final Properties properties) {
+ RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+ return new NettyRemotingServer(config);
+ }
+
+ public static NettyRemotingServer createRemotingServer(@NotNull final RemotingConfig config) {
+ return new NettyRemotingServer(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ChannelStatistics.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ChannelStatistics.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ChannelStatistics.java
new file mode 100755
index 0000000..ff0f9c9
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ChannelStatistics.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.group.ChannelGroup;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.remoting.common.metrics.ChannelMetrics;
+
+public class ChannelStatistics extends ChannelDuplexHandler implements ChannelMetrics {
+ public static final String NAME = ChannelStatistics.class.getSimpleName();
+ private final AtomicInteger channelCount = new AtomicInteger(0);
+ private final ChannelGroup allChannels;
+
+ public ChannelStatistics(ChannelGroup allChannels) {
+ this.allChannels = allChannels;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ // connect
+ channelCount.incrementAndGet();
+ allChannels.add(ctx.channel());
+ super.channelActive(ctx);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ // disconnect
+ channelCount.decrementAndGet();
+ allChannels.remove(ctx.channel());
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public Integer getChannelCount() {
+ return channelCount.get();
+ }
+
+ @Override
+ public ChannelGroup getChannels() {
+ return allChannels;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
new file mode 100644
index 0000000..87a0912
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Decoder.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.exception.RemoteCodecException;
+import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper;
+import org.apache.rocketmq.remoting.impl.command.CodecHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Decoder extends ByteToMessageDecoder {
+ private static final Logger LOG = LoggerFactory.getLogger(Decoder.class);
+
+ public Decoder() {
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+ if (!in.isReadable()) {
+ return;
+ }
+
+ NettyByteBufferWrapper wrapper = new NettyByteBufferWrapper(in, ctx.channel());
+
+ Object msg = this.decode(ctx, wrapper);
+ if (msg != null) {
+ out.add(msg);
+ }
+ }
+
+ private Object decode(final ChannelHandlerContext ctx, ByteBufferWrapper wrapper) throws Exception {
+ int originReaderIndex = wrapper.readerIndex();
+
+ byte type = wrapper.readByte();
+ try {
+ RemotingCommand cmd = decode(wrapper, originReaderIndex);
+ if (cmd != null) {
+ cmd.protocolType(type);
+ }
+ return cmd;
+ } catch (final RemoteCodecException e) {
+ LOG.warn("Decode error {}, close the channel {}", e.getMessage(), ctx.channel());
+ ctx.channel().close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ LOG.warn("Close channel {} because of error {},result is {}", ctx.channel(), e, future.isSuccess());
+ }
+ });
+ }
+ return null;
+ }
+
+ public RemotingCommand decode(final ByteBufferWrapper wrapper, final int originReaderIndex) {
+ // Full message isn't available yet, return nothing for now
+ if (wrapper.readableBytes() < CodecHelper.MIN_PROTOCOL_LEN - 1) {
+ wrapper.setReaderIndex(originReaderIndex);
+ return null;
+ }
+
+ int totalLength = wrapper.readInt();
+
+ if (totalLength <= 0) {
+ throw new IllegalArgumentException("Illegal total length " + totalLength);
+ }
+
+ if (totalLength > CodecHelper.PACKET_MAX_LEN) {
+ throw new IllegalArgumentException(String.format("Total length %d is more than limit %d", totalLength, CodecHelper.PACKET_MAX_LEN));
+ }
+
+ if (wrapper.readableBytes() < totalLength) {
+ wrapper.setReaderIndex(originReaderIndex);
+ return null;
+ }
+
+ ByteBuffer totalBuffer = ByteBuffer.allocate(totalLength);
+
+ wrapper.readBytes(totalBuffer);
+
+ totalBuffer.flip();
+
+ return CodecHelper.decode(totalBuffer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
new file mode 100644
index 0000000..10aa504
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
+import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper;
+import org.apache.rocketmq.remoting.impl.command.CodecHelper;
+import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Encoder extends MessageToByteEncoder<RemotingCommand> {
+ private static final Logger LOG = LoggerFactory.getLogger(Encoder.class);
+
+ private final SerializerFactory serializerFactory = new SerializerFactoryImpl();
+
+ public Encoder() {
+ }
+
+ @Override
+ public void encode(final ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception {
+ try {
+ ByteBufferWrapper wrapper = new NettyByteBufferWrapper(out);
+
+ encode(serializerFactory, remotingCommand, wrapper);
+ } catch (final Exception e) {
+ LOG.error("Error occurred when encoding response for channel " + ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(), e);
+ if (remotingCommand != null) {
+ LOG.error(remotingCommand.toString());
+ }
+ ctx.channel().close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ LOG.warn("Close channel {} because of error {},result is {}", ctx.channel(), e, future.isSuccess());
+ }
+ });
+ }
+ }
+
+ public void encode(final SerializerFactory serializerFactory, final RemotingCommand remotingCommand,
+ final ByteBufferWrapper out) {
+ ByteBuffer encodeParameter = null;
+ if (remotingCommand.parameterBytes() != null) {
+ encodeParameter = ByteBuffer.wrap(remotingCommand.parameterBytes());
+ } else if (remotingCommand.parameter() != null) {
+ final Serializer serialization = serializerFactory.get(remotingCommand.serializerType());
+ encodeParameter = serialization.encode(remotingCommand.parameter());
+ }
+
+ int parameterLength = encodeParameter != null ? encodeParameter.limit() : 0;
+ int extBodyLength = remotingCommand.extraPayload() != null ? remotingCommand.extraPayload().length : 0;
+
+ ByteBuffer header = CodecHelper.encodeHeader(remotingCommand, parameterLength, extBodyLength);
+ out.writeBytes(header);
+
+ if (encodeParameter != null) {
+ out.writeBytes(encodeParameter);
+ }
+
+ if (remotingCommand.extraPayload() != null) {
+ out.writeBytes(remotingCommand.extraPayload());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java
new file mode 100644
index 0000000..52563f4
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ExceptionHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ChannelHandler.Sharable
+public class ExceptionHandler extends ChannelDuplexHandler {
+ private final static Logger LOG = LoggerFactory.getLogger(ExceptionHandler.class);
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ // Uncaught exceptions from inbound handlers will propagate up to this handler
+ LOG.error(String.format("channel exception %s occurred ! ", ctx.channel()), cause);
+ ctx.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java
new file mode 100644
index 0000000..7cdb976
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.DefaultHttp2Connection;
+import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
+import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
+import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
+import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder;
+import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2ConnectionDecoder;
+import io.netty.handler.codec.http2.Http2ConnectionEncoder;
+import io.netty.handler.codec.http2.Http2ConnectionHandler;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2FrameAdapter;
+import io.netty.handler.codec.http2.Http2FrameReader;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2HeadersDecoder;
+import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.handler.codec.http2.StreamBufferingEncoder;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
+
+public class Http2Handler extends Http2ConnectionHandler {
+
+ private boolean isServer;
+ private int lastStreamId;
+
+ private Http2Handler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
+ final Http2Settings initialSettings, final boolean isServer) {
+ super(decoder, encoder, initialSettings);
+ decoder.frameListener(new FrameListener());
+ this.isServer = isServer;
+ }
+
+ public static Http2Handler newHandler(final boolean isServer) {
+
+ Http2HeadersDecoder headersDecoder = new DefaultHttp2HeadersDecoder(true);
+ Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
+ Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
+
+ Http2Connection connection = new DefaultHttp2Connection(isServer);
+
+ Http2ConnectionEncoder encoder = new StreamBufferingEncoder(
+ new DefaultHttp2ConnectionEncoder(connection, frameWriter));
+
+ connection.local().flowController(new DefaultHttp2LocalFlowController(connection,
+ DEFAULT_WINDOW_UPDATE_RATIO, true));
+
+ Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
+ frameReader);
+
+ Http2Settings settings = new Http2Settings();
+
+ if (!isServer)
+ settings.pushEnabled(true);
+
+ settings.initialWindowSize(1048576 * 10); //10MiB
+ settings.maxConcurrentStreams(Integer.MAX_VALUE);
+
+ return newHandler(decoder, encoder, settings, isServer);
+ }
+
+ private static Http2Handler newHandler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
+ final Http2Settings settings, boolean isServer) {
+ return new Http2Handler(decoder, encoder, settings, isServer);
+ }
+
+ @Override
+ public void write(final ChannelHandlerContext ctx, final Object msg,
+ final ChannelPromise promise) throws Exception {
+ if (isServer) {
+ assert msg instanceof ByteBuf;
+ sendAPushPromise(ctx, lastStreamId, lastStreamId + 1, (ByteBuf) msg);
+ } else {
+
+ final Http2Headers headers = new DefaultHttp2Headers();
+
+ try {
+ long threadId = Thread.currentThread().getId();
+ long streamId = (threadId % 2 == 0) ? threadId + 1 : threadId + 2;
+ encoder().writeHeaders(ctx, (int) streamId, headers, 0, false, promise);
+ encoder().writeData(ctx, (int) streamId, (ByteBuf) msg, 0, false, ctx.newPromise());
+ ctx.flush();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ }
+
+ private void sendAPushPromise(ChannelHandlerContext ctx, int streamId, int pushPromiseStreamId,
+ ByteBuf payload) throws Http2Exception {
+
+ encoder().writePushPromise(ctx, streamId, pushPromiseStreamId,
+ new DefaultHttp2Headers().status(OK.codeAsText()), 0, ctx.newPromise());
+
+ //Http2Stream stream = connection.local().reservePushStream(pushPromiseStreamId, connection.connectionStream());
+ Http2Headers headers = new DefaultHttp2Headers();
+ headers.status(OK.codeAsText());
+ encoder().writeHeaders(ctx, pushPromiseStreamId, headers, 0, false, ctx.newPromise());
+ encoder().writeData(ctx, pushPromiseStreamId, payload, 0, false, ctx.newPromise());
+ }
+
+ private class FrameListener extends Http2FrameAdapter {
+ @Override
+ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
+ boolean endOfStream) throws Http2Exception {
+ //Http2Handler.this.onDataRead(ctx, streamId, data, endOfStream);
+ data.retain();
+ Http2Handler.this.lastStreamId = streamId;
+ ctx.fireChannelRead(data);
+ return data.readableBytes() + padding;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java
new file mode 100644
index 0000000..e00a213
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.ssl.SslContext;
+import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
+import org.apache.rocketmq.remoting.impl.channel.ChannelHandlerContextWrapperImpl;
+import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProtocolSelector extends SimpleChannelInboundHandler<ByteBuf> {
+ public static final String NAME = ProtocolSelector.class.getSimpleName();
+ private static final Logger LOG = LoggerFactory.getLogger(ProtocolSelector.class);
+ private ProtocolFactory protocolFactory;
+
+ public ProtocolSelector(final SslContext sslContext) {
+ this.protocolFactory = new ProtocolFactoryImpl(sslContext);
+ }
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws Exception {
+ if (msg.readableBytes() < 1) {
+ return;
+ }
+ msg.markReaderIndex();
+ Protocol protocol = protocolFactory.get(msg.readByte());
+ if (protocol == null) {
+ ctx.channel().close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ LOG.warn("Close channel {},result is {}", ctx.channel(), future.isSuccess());
+ }
+ });
+ return;
+ }
+ ChannelHandlerContextWrapper chcw = new ChannelHandlerContextWrapperImpl(ctx);
+ protocol.assembleHandler(chcw);
+ msg.resetReaderIndex();
+ ctx.pipeline().remove(this);
+ ctx.fireChannelRead(msg.retain());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java
new file mode 100644
index 0000000..0740cbb
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.ssl.SslContext;
+import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+import org.apache.rocketmq.remoting.impl.netty.handler.Http2Handler;
+import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector;
+
+public class Httpv2Protocol extends RemotingCoreProtocol {
+ private SslContext sslContext;
+
+ public Httpv2Protocol(final SslContext sslContext) {
+ this.sslContext = sslContext;
+ }
+
+ @Override
+ public String name() {
+ return Protocol.HTTP2;
+ }
+
+ @Override
+ public byte type() {
+ return Protocol.HTTP_2_MAGIC;
+ }
+
+ @Override
+ public void assembleHandler(final ChannelHandlerContextWrapper ctx) {
+ super.assembleHandler(ctx);
+ ChannelHandlerContext chx = (ChannelHandlerContext) ctx.getContext();
+
+ chx.pipeline().addAfter(ProtocolSelector.NAME, "sslHandler", sslContext.newHandler(chx.alloc()));
+ chx.pipeline().addAfter("sslHandler", "http2Handler", Http2Handler.newHandler(true));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java
new file mode 100644
index 0000000..15322be
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol;
+
+import io.netty.handler.ssl.SslContext;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
+
+public class ProtocolFactoryImpl implements ProtocolFactory {
+ private static final int MAX_COUNT = 0x0FF;
+ private final Protocol[] tables = new Protocol[MAX_COUNT];
+
+ private SslContext sslContext;
+
+ public ProtocolFactoryImpl(final SslContext sslContext) {
+ this.sslContext = sslContext;
+ this.register(new RemotingCoreProtocol());
+ this.register(new Httpv2Protocol(sslContext));
+ this.register(new WebSocketProtocol());
+ }
+
+ public ProtocolFactoryImpl() {
+ this.register(new RemotingCoreProtocol());
+ this.register(new Httpv2Protocol(sslContext));
+ this.register(new WebSocketProtocol());
+ }
+
+ @Override
+ public void register(Protocol protocol) {
+ if (tables[protocol.type() & MAX_COUNT] != null) {
+ throw new RuntimeException("protocol header's sign is overlapped");
+ }
+ tables[protocol.type() & MAX_COUNT] = protocol;
+ }
+
+ @Override
+ public void resetAll(final Protocol protocol) {
+ for (int i = 0; i < MAX_COUNT; i++) {
+ tables[i] = protocol;
+ }
+ }
+
+ @Override
+ public byte type(final String protocolName) {
+
+ for (int i = 0; i < this.tables.length; i++) {
+ if (this.tables[i] != null) {
+ if (this.tables[i].name().equalsIgnoreCase(protocolName)) {
+ return this.tables[i].type();
+ }
+ }
+ }
+
+ throw new IllegalArgumentException(String.format("the protocol: %s not exist", protocolName));
+ }
+
+ @Override
+ public Protocol get(byte type) {
+ return tables[type & MAX_COUNT];
+ }
+
+ @Override
+ public void clearAll() {
+ for (int i = 0; i < this.tables.length; i++) {
+ this.tables[i] = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java
new file mode 100644
index 0000000..317b24f
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
+import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
+import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector;
+
+public class RemotingCoreProtocol implements Protocol {
+ @Override
+ public String name() {
+ return MVP;
+ }
+
+ @Override
+ public byte type() {
+ return MVP_MAGIC;
+ }
+
+ @Override
+ public void assembleHandler(final ChannelHandlerContextWrapper ctx) {
+
+ ChannelHandlerContext chx = (ChannelHandlerContext) ctx.getContext();
+
+ chx.pipeline().addAfter(ProtocolSelector.NAME, "decoder", new Decoder());
+ chx.pipeline().addAfter("decoder", "encoder", new Encoder());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java
new file mode 100644
index 0000000..18a3a11
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol;
+
+import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+
+public class WebSocketProtocol implements Protocol {
+ @Override
+ public String name() {
+ return WEBSOCKET;
+ }
+
+ @Override
+ public byte type() {
+ return WEBSOCKET_MAGIC;
+ }
+
+ @Override
+ public void assembleHandler(final ChannelHandlerContextWrapper ctx) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java
new file mode 100644
index 0000000..10e97ba
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/CompressorFactoryImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.compression;
+
+import org.apache.rocketmq.remoting.api.compressable.Compressor;
+import org.apache.rocketmq.remoting.api.compressable.CompressorFactory;
+
+public class CompressorFactoryImpl implements CompressorFactory {
+ private static final int MAX_COUNT = 0x0FF;
+ private final Compressor[] tables = new Compressor[MAX_COUNT];
+
+ public CompressorFactoryImpl() {
+ this.register(new GZipCompressor());
+ }
+
+ @Override
+ public void register(Compressor compressor) {
+ if (tables[compressor.type() & MAX_COUNT] != null) {
+ throw new RuntimeException("compressor header's sign is overlapped");
+ }
+ tables[compressor.type() & MAX_COUNT] = compressor;
+ }
+
+ @Override
+ public byte type(String compressionName) {
+ for (Compressor table : this.tables) {
+ if (table != null) {
+ if (table.name().equalsIgnoreCase(compressionName)) {
+ return table.type();
+ }
+ }
+ }
+
+ throw new IllegalArgumentException(String.format("the compressor: %s not exist", compressionName));
+
+ }
+
+ @Override
+ public Compressor get(byte type) {
+ return tables[type & MAX_COUNT];
+ }
+
+ @Override
+ public void clearAll() {
+ for (int i = 0; i < this.tables.length; i++) {
+ this.tables[i] = null;
+ }
+ }
+
+ public Compressor[] getTables() {
+ return tables;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java
new file mode 100644
index 0000000..fc33f4c
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/compression/GZipCompressor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.rocketmq.remoting.api.compressable.Compressor;
+
+public class GZipCompressor implements Compressor {
+ public static final int BUFFER = 1024;
+ public static final String COMPRESSOR_NAME = GZipCompressor.class.getSimpleName();
+ public static final byte COMPRESSOR_TYPE = 'G';
+
+ @Override
+ public String name() {
+ return COMPRESSOR_NAME;
+ }
+
+ @Override
+ public byte type() {
+ return COMPRESSOR_TYPE;
+ }
+
+ @Override
+ public byte[] compress(byte[] content) throws Exception {
+ if (content == null)
+ return new byte[0];
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(content);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ compress(bais, baos);
+ byte[] output = baos.toByteArray();
+ baos.flush();
+ baos.close();
+ bais.close();
+ return output;
+
+ }
+
+ private void compress(InputStream is, OutputStream os) throws Exception {
+ GZIPOutputStream gos = new GZIPOutputStream(os);
+
+ int count;
+ byte data[] = new byte[BUFFER];
+ while ((count = is.read(data, 0, BUFFER)) != -1) {
+ gos.write(data, 0, count);
+ }
+ gos.finish();
+ gos.flush();
+ gos.close();
+ }
+
+ @Override
+ public byte[] deCompress(byte[] content) throws Exception {
+ if (content == null)
+ return new byte[0];
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(content);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ decompress(bais, baos);
+ content = baos.toByteArray();
+ baos.flush();
+ baos.close();
+ bais.close();
+ return content;
+ }
+
+ private void decompress(InputStream is, OutputStream os) throws Exception {
+ GZIPInputStream gis = new GZIPInputStream(is);
+
+ int count;
+ byte data[] = new byte[BUFFER];
+ while ((count = gis.read(data, 0, BUFFER)) != -1) {
+ os.write(data, 0, count);
+ }
+ gis.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java
new file mode 100644
index 0000000..c85d44b
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/JsonSerializer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.serializer;
+
+import com.alibaba.fastjson.JSON;
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.remoting.common.TypePresentation;
+import org.apache.rocketmq.remoting.impl.command.CodecHelper;
+
+public class JsonSerializer implements Serializer {
+ public static final String SERIALIZER_NAME = JsonSerializer.class.getSimpleName();
+ public static final byte SERIALIZER_TYPE = 'J';
+
+ public JsonSerializer() {
+ }
+
+ @Override
+ public String name() {
+ return SERIALIZER_NAME;
+ }
+
+ @Override
+ public byte type() {
+ return SERIALIZER_TYPE;
+ }
+
+ @Override
+ public <T> T decode(final byte[] content, final Class<T> c) {
+ if (content != null) {
+ try {
+ final String jsonString = new String(content, CodecHelper.REMOTING_CHARSET);
+ return JSON.parseObject(jsonString, c);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) {
+ return decode(content, typePresentation.getType());
+ }
+
+ @Override
+ public <T> T decode(byte[] content, Type type) {
+ if (content != null) {
+ try {
+ final String jsonString = new String(content, CodecHelper.REMOTING_CHARSET);
+ return JSON.parseObject(jsonString, type);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ByteBuffer encode(final Object object) {
+ if (object != null) {
+ String jsonString = JSON.toJSONString(object);
+ byte[] bytes = jsonString.getBytes(CodecHelper.REMOTING_CHARSET);
+ try {
+ return ByteBuffer.wrap(bytes, 0, bytes.length);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java
new file mode 100644
index 0000000..06ea217
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/Kryo3Serializer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.serializer;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.remoting.common.TypePresentation;
+
+public class Kryo3Serializer implements Serializer {
+ public static final String SERIALIZER_NAME = Kryo3Serializer.class.getSimpleName();
+ public static final byte SERIALIZER_TYPE = 'K';
+
+ public Kryo3Serializer() {
+ }
+
+ @Override
+ public String name() {
+ return SERIALIZER_NAME;
+ }
+
+ @Override
+ public byte type() {
+ return SERIALIZER_TYPE;
+ }
+
+ @Override
+ public <T> T decode(final byte[] content, final Class<T> c) {
+ if (content != null) {
+ Input input = null;
+ try {
+ input = new Input(content);
+ return (T) ThreadSafeKryo.getKryoInstance().readClassAndObject(input);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ input.close();
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) {
+ return decode(content, typePresentation.getType());
+ }
+
+ @Override
+ public <T> T decode(byte[] content, Type type) {
+ if (type instanceof ParameterizedType) {
+ return decode(content, (Class<? extends T>) ((ParameterizedType) type).getRawType());
+ } else if (type instanceof Class) {
+ return decode(content, (Class<? extends T>) type);
+ }
+ return null;
+ }
+
+ @Override
+ public ByteBuffer encode(final Object object) {
+ if (object != null) {
+ try (Output output = new Output(1024, 1024 * 1024 * 6)) {
+ ThreadSafeKryo.getKryoInstance().writeClassAndObject(output, object);
+ return ByteBuffer.wrap(output.getBuffer(), 0, output.position());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java
new file mode 100644
index 0000000..1097f8f
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/MsgPackSerializer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.serializer;
+
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.remoting.common.TypePresentation;
+import org.msgpack.MessagePack;
+import org.msgpack.template.Template;
+
+public class MsgPackSerializer implements Serializer {
+ public static final String SERIALIZER_NAME = MsgPackSerializer.class.getSimpleName();
+ public static final byte SERIALIZER_TYPE = 'M';
+ private final MessagePack messagePack = new MessagePack();
+
+ public MsgPackSerializer() {
+ }
+
+ @Override
+ public String name() {
+ return SERIALIZER_NAME;
+ }
+
+ @Override
+ public byte type() {
+ return SERIALIZER_TYPE;
+ }
+
+ @Override
+ public <T> T decode(final byte[] content, final Class<T> c) {
+ try {
+ return messagePack.read(content, c);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public <T> T decode(final byte[] content, final TypePresentation<T> typePresentation) {
+ return decode(content, typePresentation.getType());
+ }
+
+ @Override
+ public <T> T decode(byte[] content, Type type) {
+ Template<T> template = (Template<T>) messagePack.lookup(type);
+ try {
+ return messagePack.read(content, template);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public ByteBuffer encode(final Object object) {
+ try {
+ byte[] data = messagePack.write(object);
+ return ByteBuffer.wrap(data);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java
new file mode 100644
index 0000000..632b61f
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializerFactoryImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.serializer;
+
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
+
+public class SerializerFactoryImpl implements SerializerFactory {
+ private static final int MAX_COUNT = 0x0FF;
+ private final Serializer[] tables = new Serializer[MAX_COUNT];
+
+ public SerializerFactoryImpl() {
+ this.register(new JsonSerializer());
+ this.register(new Kryo3Serializer());
+ this.register(new MsgPackSerializer());
+ }
+
+ @Override
+ public void register(Serializer serialization) {
+ if (tables[serialization.type() & MAX_COUNT] != null) {
+ throw new RuntimeException("serialization header's sign is overlapped");
+ }
+ tables[serialization.type() & MAX_COUNT] = serialization;
+ }
+
+ @Override
+ public byte type(final String serializationName) {
+ for (Serializer table : this.tables) {
+ if (table != null) {
+ if (table.name().equalsIgnoreCase(serializationName)) {
+ return table.type();
+ }
+ }
+ }
+
+ throw new IllegalArgumentException(String.format("the serialization: %s not exist", serializationName));
+ }
+
+ @Override
+ public Serializer get(byte type) {
+ return tables[type & MAX_COUNT];
+ }
+
+ @Override
+ public void clearAll() {
+ for (int i = 0; i < this.tables.length; i++) {
+ this.tables[i] = null;
+ }
+ }
+
+ public Serializer[] getTables() {
+ return tables;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java
new file mode 100644
index 0000000..cadfc27
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/serializer/ThreadSafeKryo.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.serializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Currency;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+public class ThreadSafeKryo {
+ private static final ThreadLocal<Kryo> KRYOS = new ThreadLocal<Kryo>() {
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+
+ kryo.register(byte[].class);
+ kryo.register(char[].class);
+ kryo.register(short[].class);
+ kryo.register(int[].class);
+ kryo.register(long[].class);
+ kryo.register(float[].class);
+ kryo.register(double[].class);
+ kryo.register(boolean[].class);
+ kryo.register(String[].class);
+ kryo.register(Object[].class);
+ kryo.register(KryoSerializable.class);
+ kryo.register(BigInteger.class);
+ kryo.register(BigDecimal.class);
+ kryo.register(Class.class);
+ kryo.register(Date.class);
+ // kryo.register(Enum.class);
+ kryo.register(EnumSet.class);
+ kryo.register(Currency.class);
+ kryo.register(StringBuffer.class);
+ kryo.register(StringBuilder.class);
+ kryo.register(Collections.EMPTY_LIST.getClass());
+ kryo.register(Collections.EMPTY_MAP.getClass());
+ kryo.register(Collections.EMPTY_SET.getClass());
+ kryo.register(Collections.singletonList(null).getClass());
+ kryo.register(Collections.singletonMap(null, null).getClass());
+ kryo.register(Collections.singleton(null).getClass());
+ kryo.register(TreeSet.class);
+ kryo.register(Collection.class);
+ kryo.register(TreeMap.class);
+ kryo.register(Map.class);
+ try {
+ kryo.register(Class.forName("sun.util.calendar.ZoneInfo"));
+ } catch (ClassNotFoundException e) {
+ // Noop
+ }
+ kryo.register(Calendar.class);
+ kryo.register(Locale.class);
+
+ kryo.register(BitSet.class);
+ kryo.register(HashMap.class);
+ kryo.register(Timestamp.class);
+ kryo.register(ArrayList.class);
+
+ // kryo.setRegistrationRequired(true);
+ kryo.setReferences(false);
+ kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
+
+ return kryo;
+ }
+ };
+
+ public static Kryo getKryoInstance() {
+ return KRYOS.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/BeanUtils.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/BeanUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/BeanUtils.java
new file mode 100644
index 0000000..0177990
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/BeanUtils.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.internal;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class BeanUtils {
+ private final static Logger LOG = LoggerFactory.getLogger(BeanUtils.class);
+
+ /**
+ * <p>Populate the JavaBeans properties of the specified bean, based on
+ * the specified name/value pairs. This method uses Java reflection APIs
+ * to identify corresponding "property setter" method names, and deals
+ * with setter arguments of type <Code>String</Code>, <Code>boolean</Code>,
+ * <Code>int</Code>, <Code>long</Code>, <Code>float</Code>, and
+ * <Code>double</Code>.</p>
+ *
+ * <p>The particular setter method to be called for each property is
+ * determined using the usual JavaBeans introspection mechanisms. Thus,
+ * you may identify custom setter methods using a BeanInfo class that is
+ * associated with the class of the bean itself. If no such BeanInfo
+ * class is available, the standard method name conversion ("set" plus
+ * the capitalized name of the property in question) is used.</p>
+ *
+ * <p><strong>NOTE</strong>: It is contrary to the JavaBeans Specification
+ * to have more than one setter method (with different argument
+ * signatures) for the same property.</p>
+ *
+ * @param clazz JavaBean class whose properties are being populated
+ * @param properties Map keyed by property name, with the corresponding (String or String[]) value(s) to be set
+ * @param <T> Class type
+ * @return Class instance
+ */
+ public static <T> T populate(final Properties properties, final Class<T> clazz) {
+ T obj = null;
+ try {
+ obj = clazz.newInstance();
+ return populate(properties, obj);
+ } catch (Throwable e) {
+ LOG.warn("Error occurs !", e);
+ }
+ return obj;
+ }
+
+ private static <T> void setField(final Field field, final Properties properties, final T obj) throws Exception {
+ Type fieldType = field.getType();
+ String fieldName = field.getName();
+
+ String value = null;
+ String configName = convertToConfigName(fieldName);
+ String envName = convertToEnvName(fieldName);
+
+ if (properties.containsKey(envName)) {
+ value = properties.getProperty(envName);
+ }
+
+ if (properties.containsKey(configName)) {
+ value = properties.getProperty(configName);
+ }
+
+ if (value == null) {
+ return;
+ }
+
+ if (fieldType == Boolean.TYPE) {
+ field.set(obj, Boolean.valueOf(value));
+ } else if (fieldType == Integer.TYPE) {
+ field.set(obj, Integer.valueOf(value));
+ } else if (fieldType == Double.TYPE) {
+ field.set(obj, Double.valueOf(value));
+ } else if (fieldType == Float.TYPE) {
+ field.set(obj, Float.valueOf(value));
+ } else if (fieldType == Long.TYPE) {
+ field.set(obj, Long.valueOf(value));
+ } else
+ field.set(obj, value);
+ }
+
+ private static String convertToConfigName(String variableName) {
+ StringBuilder sb = new StringBuilder();
+ for (char c : variableName.toCharArray()) {
+ if (Character.isUpperCase(c)) {
+ sb.append('.');
+ }
+ sb.append(Character.toLowerCase(c));
+ }
+ return sb.toString();
+ }
+
+ private static String convertToEnvName(String variableName) {
+ StringBuilder sb = new StringBuilder();
+ for (char c : variableName.toCharArray()) {
+ if (Character.isUpperCase(c)) {
+ sb.append('_');
+ }
+ sb.append(Character.toUpperCase(c));
+ }
+ return sb.toString();
+ }
+
+ public static <T> T populate(final Properties properties, final T obj) {
+ Class<?> clazz = obj.getClass();
+ List<Field> allFields = new ArrayList<>();
+ allFields = getAllFields(allFields, clazz);
+ Properties fullProp = extractProperties(properties);
+
+ try {
+ for (Field field : allFields) {
+ if (!Modifier.isStatic(field.getModifiers())) {
+ field.setAccessible(true);
+ setField(field, fullProp, obj);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Error occurs !", e);
+ }
+ return obj;
+ }
+
+ public static String configObjectToString(final Object object) {
+ List<Field> allFields = new ArrayList<>();
+ getAllFields(allFields, object.getClass());
+ StringBuilder sb = new StringBuilder();
+ for (Field field : allFields) {
+ if (!Modifier.isStatic(field.getModifiers())) {
+ String name = field.getName();
+ if (!name.startsWith("this")) {
+ Object value = null;
+ try {
+ field.setAccessible(true);
+ value = field.get(object);
+ if (null == value) {
+ value = "";
+ }
+ } catch (IllegalAccessException ignored) {
+ }
+ sb.append(name).append("=").append(value).append("%n");
+ }
+ }
+ }
+ return sb.toString();
+ }
+
+ private static List<Field> getAllFields(List<Field> fields, Class<?> type) {
+ fields.addAll(Arrays.asList(type.getDeclaredFields()));
+ if (type.getSuperclass() != null) {
+ getAllFields(fields, type.getSuperclass());
+ }
+ return fields;
+ }
+
+ private static Properties extractProperties(final Properties properties) {
+ Properties newPro = new Properties();
+
+ Map<String, String> envMap = System.getenv();
+ for (final Map.Entry<String, String> entry : envMap.entrySet()) {
+ newPro.setProperty(entry.getKey().toUpperCase(), entry.getValue());
+ newPro.setProperty(entry.getKey().toLowerCase(), entry.getValue()); //EnvProp supports A_B_C and a.b.c
+ }
+
+ Properties systemProp = System.getProperties(); //SystemProp supports a.b.c
+ for (final Map.Entry<Object, Object> entry : systemProp.entrySet()) {
+ newPro.setProperty(String.valueOf(entry.getKey()).toLowerCase(), String.valueOf(entry.getValue()));
+ }
+
+ Properties inner = null;
+ try {
+ Field field = Properties.class.getDeclaredField("defaults");
+ field.setAccessible(true);
+ inner = (Properties) field.get(properties);
+ } catch (Exception ignore) {
+ }
+
+ if (inner != null) {
+ for (final Map.Entry<Object, Object> entry : inner.entrySet()) {
+ newPro.setProperty(String.valueOf(entry.getKey()).toLowerCase(), String.valueOf(entry.getValue()));
+ }
+ }
+
+ for (final Map.Entry<Object, Object> entry : properties.entrySet()) {
+ newPro.setProperty(String.valueOf(entry.getKey()).toLowerCase(), String.valueOf(entry.getValue()));
+ }
+
+ return newPro;
+ }
+}