You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2011/12/13 01:11:46 UTC

svn commit: r1213507 - in /avro/trunk: CHANGES.txt lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java

Author: cutting
Date: Tue Dec 13 00:11:45 2011
New Revision: 1213507

URL: http://svn.apache.org/viewvc?rev=1213507&view=rev
Log:
Java: Extend NettyServer to permit specification of an ExecutionHandler, to handle multiple requests simultaneously.  Contributed by Bruno Dumon.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1213507&r1=1213506&r2=1213507&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Dec 13 00:11:45 2011
@@ -22,6 +22,9 @@ Avro 1.6.2 (unreleased)
     AVRO-965. Java: Enhance IDL to support properties for protocols
     and messages. (George Fletcher via cutting)
 
+    AVRO-976. Java: Extend NettyServer to permit specification of an
+    ExecutionHandler, to handle multiple requests simultaneously.
+
   BUG FIXES
 
     AVRO-962. Java: Fix Maven plugin to support string type override.

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java?rev=1213507&r1=1213506&r2=1213507&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java Tue Dec 13 00:11:45 2011
@@ -44,6 +44,7 @@ import org.jboss.netty.channel.group.Cha
 import org.jboss.netty.channel.group.ChannelGroupFuture;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.execution.ExecutionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +62,7 @@ public class NettyServer implements Serv
       "avro-netty-server");
   private final ChannelFactory channelFactory;
   private final CountDownLatch closed = new CountDownLatch(1);
+  private final ExecutionHandler executionHandler;            
   
   public NettyServer(Responder responder, InetSocketAddress addr) {
     this(responder, addr, new NioServerSocketChannelFactory
@@ -69,23 +71,39 @@ public class NettyServer implements Serv
   
   public NettyServer(Responder responder, InetSocketAddress addr,
                      ChannelFactory channelFactory) {
-    this.responder = responder;
-    this.channelFactory = channelFactory;
-    ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
-    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-      @Override
-      public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline p = Channels.pipeline();
-        p.addLast("frameDecoder", new NettyFrameDecoder());
-        p.addLast("frameEncoder", new NettyFrameEncoder());
-        p.addLast("handler", new NettyServerAvroHandler());
-        return p;
-      }
-    });
-    serverChannel = bootstrap.bind(addr);
-    allChannels.add(serverChannel);
+      this(responder, addr, channelFactory, null);
   }
 
+    /**
+     *
+     * @param executionHandler if not null, will be inserted into the Netty
+     *                         pipeline. Use this when your responder does
+     *                         long, non-cpu bound processing (see Netty's
+     *                         ExecutionHandler javadoc).
+     */
+  public NettyServer(Responder responder, InetSocketAddress addr,
+                     ChannelFactory channelFactory, final ExecutionHandler executionHandler) {
+      this.responder = responder;
+      this.channelFactory = channelFactory;
+      this.executionHandler = executionHandler;
+      ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
+      bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+          @Override
+          public ChannelPipeline getPipeline() throws Exception {
+              ChannelPipeline p = Channels.pipeline();
+              p.addLast("frameDecoder", new NettyFrameDecoder());
+              p.addLast("frameEncoder", new NettyFrameEncoder());
+              if (executionHandler != null) {
+                  p.addLast("executionHandler", executionHandler);
+              }
+              p.addLast("handler", new NettyServerAvroHandler());
+              return p;
+          }
+      });
+      serverChannel = bootstrap.bind(addr);
+      allChannels.add(serverChannel);
+  }
+    
   @Override
   public void start() {
     // No-op.