You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@phoenix.apache.org by GitBox <gi...@apache.org> on 2021/03/19 18:23:18 UTC

[GitHub] [phoenix-omid] joshelser commented on a change in pull request #92: OMID-202 Refactor Omid to use Netty 4

joshelser commented on a change in pull request #92:
URL: https://github.com/apache/phoenix-omid/pull/92#discussion_r597882282



##########
File path: transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
##########
@@ -342,7 +344,7 @@ public void nodeChanged() throws Exception {
         setTSOAddress(hp.getHost(), hp.getPort());
         epoch = Long.parseLong(currentTSOAndEpochArray[1]);
         LOG.info("CurrentTSO ZNode changed. New TSO Host & Port {}/Epoch {}", hp, getEpoch());
-        if (currentChannel != null && currentChannel.isConnected()) {
+        if (currentChannel != null && currentChannel.isActive()) {

Review comment:
       Might be nice to consolidate this into a helper since you're modifying it.

##########
File path: transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
##########
@@ -513,7 +515,7 @@ void decrementRetries() {
         }
 
         public StateMachine.State handleEvent(CloseEvent e) {
-            factory.releaseExternalResources();
+            bootstrap.config().group().shutdownGracefully();

Review comment:
       If you have two racing `CloseEvent` messages, does `shutdownGracefully()` handle this correctly (i.e. is synchronized and exits quietly on the 2nd).

##########
File path: transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
##########
@@ -1012,37 +1016,34 @@ private void closeChannelAndErrorRequests() {
         }
 
         @Override
-        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
-            currentChannel = e.getChannel();
-            LOG.debug("HANDLER (CHANNEL CONNECTED): Connection {}. Sending connected event to FSM", e);
-            fsm.sendEvent(new ConnectedEvent(e.getChannel()));
+        public void channelActive(ChannelHandlerContext ctx) {
+            currentChannel = ctx.channel();
+            LOG.debug("HANDLER (CHANNEL ACTIVE): Connection {}. Sending connected event to FSM", ctx.channel());
+            fsm.sendEvent(new ConnectedEvent(ctx.channel()));
         }
 
         @Override
-        public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-            LOG.debug("HANDLER (CHANNEL DISCONNECTED): Connection {}. Sending error event to FSM", e);
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+            LOG.debug("HANDLER (CHANNEL INACTIVE): Connection {}. Sending error, then channelClosed event to FSM", ctx.channel());
+            // Netty 3 had separate callbacks, and the FSM expects both events.
+            // Sending both is much easier than rewriting the FSM

Review comment:
       No issues if we send both of these events back? Still looking to see where those are consumed.

##########
File path: tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java
##########
@@ -58,39 +62,39 @@
     private final Channel channel;
 
     public TSOClientRaw(String host, int port) throws InterruptedException, ExecutionException {
-        // Start client with Nb of active threads = 3 as maximum.
-        ChannelFactory factory = new NioClientSocketChannelFactory(
-                Executors.newCachedThreadPool(
-                        new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
-                Executors.newCachedThreadPool(
-                        new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), 3);
-        // Create the bootstrap
-        ClientBootstrap bootstrap = new ClientBootstrap(factory);
 
         InetSocketAddress addr = new InetSocketAddress(host, port);
 
-        ChannelPipeline pipeline = bootstrap.getPipeline();
-        pipeline.addLast("lengthbaseddecoder",
-                new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
-        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-        pipeline.addLast("protobufdecoder",
-                new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
-        pipeline.addLast("protobufencoder", new ProtobufEncoder());
-
-        Handler handler = new Handler();
-        pipeline.addLast("handler", handler);
-
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("keepAlive", true);
-        bootstrap.setOption("reuseAddress", true);
-        bootstrap.setOption("connectTimeoutMillis", 100);
+        // Start client with Nb of active threads = 3
+        ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build();
+        EventLoopGroup workerGroup = new NioEventLoopGroup(3, workerThreadFactory);

Review comment:
       The old code appears to have had separate boss and worker thread pools. Was the old boss pool unused?

##########
File path: tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
##########
@@ -17,95 +17,116 @@
  */
 package org.apache.omid.tso;
 
-import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.ThreadFactory;
+
+import javax.inject.Inject;
+
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.proto.TSOProto;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.inject.Inject;
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.Executors;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.GlobalEventExecutor;
 
 /**
  * ChannelHandler for the TSO Server.
  *
  * Incoming requests are processed in this class
  */
-public class TSOChannelHandler extends SimpleChannelHandler implements Closeable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(TSOChannelHandler.class);
+// Marked sharable, as all global members used in callbacks are singletons.
+@Sharable
+public class TSOChannelHandler extends ChannelInboundHandlerAdapter implements Closeable {
 
-    private final ChannelFactory factory;
+    private final Logger LOG = LoggerFactory.getLogger(TSOChannelHandler.class);
 
     private final ServerBootstrap bootstrap;
 
     @VisibleForTesting
     Channel listeningChannel;
     @VisibleForTesting
-    ChannelGroup channelGroup;
+    ChannelGroup allChannels;
 
     private RequestProcessor requestProcessor;
 
     private TSOServerConfig config;
 
     private MetricsRegistry metrics;
 
+    private static final AttributeKey<TSOChannelContext> TSO_CTX =
+            AttributeKey.valueOf("TSO_CTX");
+
     @Inject
     public TSOChannelHandler(TSOServerConfig config, RequestProcessor requestProcessor, MetricsRegistry metrics) {
 
         this.config = config;
         this.metrics = metrics;
         this.requestProcessor = requestProcessor;
-        // Setup netty listener
-        this.factory = new NioServerSocketChannelFactory(
-                Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("boss-%d").build()),
-                Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("worker-%d").build()),
-                (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2);
-
-        this.bootstrap = new ServerBootstrap(factory);
-        bootstrap.setPipelineFactory(new TSOPipelineFactory(this));
 
+        // Setup netty listener
+        int workerThreadCount= (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2;
+        ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-boss-%d").build();
+        ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-worker-%d").build();
+        EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreadCount, workerThreadFactory);
+        EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreadFactory);
+
+        this.bootstrap = new ServerBootstrap();
+        bootstrap.group(bossGroup,  workerGroup);
+        bootstrap.channel(NioServerSocketChannel.class);
+        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            public void initChannel(SocketChannel channel) throws Exception {
+                ChannelPipeline pipeline = channel.pipeline();
+                // Max packet length is 10MB. Transactions with so many cells
+                // that the packet is rejected will receive a ServiceUnavailableException.

Review comment:
       Yuck. Fine for the netty3->4 upgrade, but it would be much better if we catch that and throw a meaningful error back. I'd think we could do that with an extension of the LengthFieldBasedFrameDecoder.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org