You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/06/08 17:28:13 UTC

svn commit: r1348116 - in /camel/trunk/components/camel-netty/src: main/java/org/apache/camel/component/netty/ test/java/org/apache/camel/component/netty/

Author: davsclaus
Date: Fri Jun  8 15:28:12 2012
New Revision: 1348116

URL: http://svn.apache.org/viewvc?rev=1348116&view=rev
Log:
CAMEL-5225: Refactored camel-netty to use factory for pipeline to support stateful pipeline decoders/encoders.

Modified:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=1348116&r1=1348115&r2=1348116&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java Fri Jun  8 15:28:12 2012
@@ -22,24 +22,20 @@ import org.jboss.netty.channel.ChannelPi
 /**
  * Factory to create {@link ChannelPipeline} for clients, eg {@link NettyProducer}.
  * <p/>
- * Implementators should use implement the {@link #getPipeline(NettyProducer)} method.
+ * Implementators must support creating a new instance of this factory which is associated
+ * to the given {@link NettyProducer} using the {@link #createPipelineFactory(NettyProducer)}
+ * method.
  *
  * @see ChannelPipelineFactory
  */
 public abstract class ClientPipelineFactory implements ChannelPipelineFactory {
 
-    public ClientPipelineFactory() {
-    }
-
     /**
-     * Returns a newly created {@link ChannelPipeline}.
+     * Creates a new {@link ClientPipelineFactory} using the given {@link NettyProducer}
      *
-     * @param producer the netty producer
+     * @param producer the associated producers
+     * @return the {@link ClientPipelineFactory} associated to ghe given producer.
      */
-    public abstract ChannelPipeline getPipeline(NettyProducer producer) throws Exception;
+    public abstract ClientPipelineFactory createPipelineFactory(NettyProducer producer);
 
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-        throw new UnsupportedOperationException("use getPipeline(NettyProducer) instead");
-    }
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java?rev=1348116&r1=1348115&r2=1348116&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java Fri Jun  8 15:28:12 2012
@@ -30,11 +30,16 @@ import org.jboss.netty.handler.ssl.SslHa
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DefaultClientPipelineFactory extends ClientPipelineFactory {
+public class DefaultClientPipelineFactory extends ClientPipelineFactory  {
     private static final transient Logger LOG = LoggerFactory.getLogger(DefaultClientPipelineFactory.class);
 
-    @Override
-    public ChannelPipeline getPipeline(NettyProducer producer) throws Exception {
+    private NettyProducer producer;
+
+    public DefaultClientPipelineFactory(NettyProducer producer) {
+        this.producer = producer;
+    }
+
+    public ChannelPipeline getPipeline() throws Exception {
         // create a new pipeline
         ChannelPipeline channelPipeline = Channels.pipeline();
 
@@ -93,4 +98,8 @@ public class DefaultClientPipelineFactor
         }
     }
 
+    @Override
+    public ClientPipelineFactory createPipelineFactory(NettyProducer producer) {
+        return new DefaultClientPipelineFactory(producer);
+    }
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java?rev=1348116&r1=1348115&r2=1348116&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java Fri Jun  8 15:28:12 2012
@@ -33,8 +33,14 @@ import org.slf4j.LoggerFactory;
 public class DefaultServerPipelineFactory extends ServerPipelineFactory {
     private static final transient Logger LOG = LoggerFactory.getLogger(DefaultServerPipelineFactory.class);
 
+    private final NettyConsumer consumer;
+
+    public DefaultServerPipelineFactory(NettyConsumer consumer) {
+        this.consumer = consumer;
+    }
+
     @Override
-    public ChannelPipeline getPipeline(NettyConsumer consumer) throws Exception {
+    public ChannelPipeline getPipeline() throws Exception {
         ChannelPipeline channelPipeline = Channels.pipeline();
 
         SslHandler sslHandler = configureServerSSLOnDemand(consumer);
@@ -81,4 +87,9 @@ public class DefaultServerPipelineFactor
             return new SslHandler(sslEngine);
         }
     }
+
+    @Override
+    public ServerPipelineFactory createPipelineFactory(NettyConsumer consumer) {
+        return new DefaultServerPipelineFactory(consumer);
+    }
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1348116&r1=1348115&r2=1348116&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Fri Jun  8 15:28:12 2012
@@ -26,7 +26,6 @@ import org.jboss.netty.bootstrap.Connect
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
@@ -70,9 +69,11 @@ public class NettyConsumer extends Defau
         LOG.debug("Netty consumer binding to: {}", configuration.getAddress());
 
         // setup pipeline factory
-        pipelineFactory = configuration.getServerPipelineFactory();
-        if (pipelineFactory == null) {
-            pipelineFactory = new DefaultServerPipelineFactory();
+        ServerPipelineFactory factory = configuration.getServerPipelineFactory();
+        if (factory != null) {
+            pipelineFactory = factory.createPipelineFactory(this);
+        } else {
+            pipelineFactory = new DefaultServerPipelineFactory(this);
         }
 
         if (isTcp()) {
@@ -180,9 +181,8 @@ public class NettyConsumer extends Defau
         serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
         serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
 
-        // must get the pipeline from the factory when opening a new connection
-        ChannelPipeline serverPipeline = pipelineFactory.getPipeline(this);
-        serverBootstrap.setPipeline(serverPipeline);
+        // set the pipeline factory, which creates the pipeline for each newly created channels
+        serverBootstrap.setPipelineFactory(pipelineFactory);
 
         channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
         // to keep track of all channels in use
@@ -211,9 +211,8 @@ public class NettyConsumer extends Defau
                 new FixedReceiveBufferSizePredictorFactory(configuration.getReceiveBufferSizePredictor()));
         }
 
-        // must get the pipeline from the factory when opening a new connection
-        ChannelPipeline serverPipeline = pipelineFactory.getPipeline(this);
-        connectionlessServerBootstrap.setPipeline(serverPipeline);
+        // set the pipeline factory, which creates the pipeline for each newly created channels
+        connectionlessServerBootstrap.setPipelineFactory(pipelineFactory);
 
         channel = connectionlessServerBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
         // to keep track of all channels in use

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1348116&r1=1348115&r2=1348116&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Fri Jun  8 15:28:12 2012
@@ -37,7 +37,6 @@ import org.jboss.netty.channel.ChannelFa
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.ChannelLocal;
-import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
@@ -94,9 +93,11 @@ public class NettyProducer extends Defau
         super.doStart();
 
         // setup pipeline factory
-        pipelineFactory = configuration.getClientPipelineFactory();
-        if (pipelineFactory == null) {
-            pipelineFactory = new DefaultClientPipelineFactory();
+        ClientPipelineFactory factory = configuration.getClientPipelineFactory();
+        if (factory != null) {
+            pipelineFactory = factory.createPipelineFactory(this);
+        } else {
+            pipelineFactory = new DefaultClientPipelineFactory(this);
         }
 
         if (isTcp()) {
@@ -273,10 +274,6 @@ public class NettyProducer extends Defau
 
     private ChannelFuture openConnection() throws Exception {
         ChannelFuture answer;
-        ChannelPipeline clientPipeline;
-
-        // must get the pipeline from the factory when opening a new connection
-        clientPipeline = pipelineFactory.getPipeline(this);
 
         if (isTcp()) {
             ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
@@ -285,8 +282,8 @@ public class NettyProducer extends Defau
             clientBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
             clientBootstrap.setOption("connectTimeoutMillis", configuration.getConnectTimeout());
 
-            // set the pipeline on the bootstrap
-            clientBootstrap.setPipeline(clientPipeline);
+            // set the pipeline factory, which creates the pipeline for each newly created channels
+            clientBootstrap.setPipelineFactory(pipelineFactory);
             answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
             return answer;
         } else {
@@ -299,8 +296,8 @@ public class NettyProducer extends Defau
             connectionlessClientBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
             connectionlessClientBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
 
-            // set the pipeline on the bootstrap
-            connectionlessClientBootstrap.setPipeline(clientPipeline);
+            // set the pipeline factory, which creates the pipeline for each newly created channels
+            connectionlessClientBootstrap.setPipelineFactory(pipelineFactory);
             // bind and store channel so we can close it when stopping
             Channel channel = connectionlessClientBootstrap.bind(new InetSocketAddress(0));
             ALL_CHANNELS.add(channel);

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java?rev=1348116&r1=1348115&r2=1348116&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java Fri Jun  8 15:28:12 2012
@@ -22,22 +22,20 @@ import org.jboss.netty.channel.ChannelPi
 /**
  * Factory to create {@link ChannelPipeline} for clients, eg {@link NettyConsumer}.
  * <p/>
- * Implementators should use implement the {@link #getPipeline(NettyConsumer)} method.
+ * Implementators must support creating a new instance of this factory which is associated
+ * to the given {@link NettyConsumer} using the {@link #createPipelineFactory(NettyConsumer)}
+ * method.
  *
  * @see ChannelPipelineFactory
  */
 public abstract class ServerPipelineFactory implements ChannelPipelineFactory {
 
     /**
-     * Returns a newly created {@link ChannelPipeline}.
+     * Creates a new {@link ClientPipelineFactory} using the given {@link NettyConsumer}
      *
-     * @param consumer the netty consumer
+     * @param consumer the associated consumer
+     * @return the {@link ClientPipelineFactory} associated to ghe given consumer.
      */
-    public abstract ChannelPipeline getPipeline(NettyConsumer consumer) throws Exception;
-
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-        throw new UnsupportedOperationException("use getPipeline(NettyConsumer) instead");
-    }
+    public abstract ServerPipelineFactory createPipelineFactory(NettyConsumer consumer);
 
 }

Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java?rev=1348116&r1=1348115&r2=1348116&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java Fri Jun  8 15:28:12 2012
@@ -37,17 +37,15 @@ public class NettyCustomPipelineFactoryA
 
     @Produce(uri = "direct:start")
     protected ProducerTemplate producerTemplate;
-    private TestClientChannelPipelineFactory clientPipelineFactory;
-    private TestServerChannelPipelineFactory serverPipelineFactory;
+    private volatile boolean clientInvoked;
+    private volatile boolean serverInvoked;
     private String response;
     
     @Override
     protected JndiRegistry createRegistry() throws Exception {
         JndiRegistry registry = super.createRegistry();
-        clientPipelineFactory = new TestClientChannelPipelineFactory();
-        serverPipelineFactory = new TestServerChannelPipelineFactory();
-        registry.bind("cpf", clientPipelineFactory);
-        registry.bind("spf", serverPipelineFactory);
+        registry.bind("cpf", new TestClientChannelPipelineFactory(null));
+        registry.bind("spf", new TestServerChannelPipelineFactory(null));
         return registry;
     }
     
@@ -76,46 +74,55 @@ public class NettyCustomPipelineFactoryA
             }
         });
         context.start();
-        
+
         sendRequest();
         context.stop();
         
         assertEquals("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'", response);
-        assertEquals(true, clientPipelineFactory.isFactoryInvoked());
-        assertEquals(true, serverPipelineFactory.isFactoryInvoked());
-    } 
-    
+        assertEquals(true, clientInvoked);
+        assertEquals(true, serverInvoked);
+    }
+
     public class TestClientChannelPipelineFactory extends ClientPipelineFactory {
         private int maxLineSize = 1024;
-        private boolean invoked;
+        private NettyProducer producer;
+
+        public TestClientChannelPipelineFactory(NettyProducer producer) {
+            this.producer = producer;
+        }
 
         @Override
-        public ChannelPipeline getPipeline(NettyProducer producer) throws Exception {
-            invoked = true;
-            
+        public ChannelPipeline getPipeline() throws Exception {
+            clientInvoked = true;
+
             ChannelPipeline channelPipeline = Channels.pipeline();
 
             channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
             channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
-            channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));            
+            channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
             channelPipeline.addLast("handler", new ClientChannelHandler(producer));
 
             return channelPipeline;
         }
-        
-        public boolean isFactoryInvoked() {
-            return invoked;
+
+        @Override
+        public ClientPipelineFactory createPipelineFactory(NettyProducer producer) {
+            return new TestClientChannelPipelineFactory(producer);
         }
     }
-    
+
     public class TestServerChannelPipelineFactory extends ServerPipelineFactory {
         private int maxLineSize = 1024;
-        private boolean invoked;
+        private NettyConsumer consumer;
+
+        public TestServerChannelPipelineFactory(NettyConsumer consumer) {
+            this.consumer = consumer;
+        }
 
         @Override
-        public ChannelPipeline getPipeline(NettyConsumer consumer) throws Exception {
-            invoked = true;
-            
+        public ChannelPipeline getPipeline() throws Exception {
+            serverInvoked = true;
+
             ChannelPipeline channelPipeline = Channels.pipeline();
 
             channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
@@ -125,10 +132,10 @@ public class NettyCustomPipelineFactoryA
 
             return channelPipeline;
         }
-        
-        public boolean isFactoryInvoked() {
-            return invoked;
+
+        @Override
+        public ServerPipelineFactory createPipelineFactory(NettyConsumer consumer) {
+            return new TestServerChannelPipelineFactory(consumer);
         }
-        
     }
 }

Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java?rev=1348116&r1=1348115&r2=1348116&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java Fri Jun  8 15:28:12 2012
@@ -37,17 +37,15 @@ public class NettyCustomPipelineFactoryS
 
     @Produce(uri = "direct:start")
     protected ProducerTemplate producerTemplate;
-    private TestClientChannelPipelineFactory clientPipelineFactory;
-    private TestServerChannelPipelineFactory serverPipelineFactory;
+    private volatile boolean clientInvoked;
+    private volatile boolean serverInvoked;
     private String response;
     
     @Override
     protected JndiRegistry createRegistry() throws Exception {
         JndiRegistry registry = super.createRegistry();
-        clientPipelineFactory = new TestClientChannelPipelineFactory();
-        serverPipelineFactory = new TestServerChannelPipelineFactory();
-        registry.bind("cpf", clientPipelineFactory);
-        registry.bind("spf", serverPipelineFactory);
+        registry.bind("cpf", new TestClientChannelPipelineFactory(null));
+        registry.bind("spf", new TestServerChannelPipelineFactory(null));
         return registry;
     }
     
@@ -81,41 +79,50 @@ public class NettyCustomPipelineFactoryS
         context.stop();
         
         assertEquals("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'", response);
-        assertEquals(true, clientPipelineFactory.isFactoryInvoked());
-        assertEquals(true, serverPipelineFactory.isFactoryInvoked());
-    } 
-    
+        assertEquals(true, clientInvoked);
+        assertEquals(true, serverInvoked);
+    }
+
     public class TestClientChannelPipelineFactory extends ClientPipelineFactory {
         private int maxLineSize = 1024;
-        private boolean invoked;
+        private NettyProducer producer;
+
+        public TestClientChannelPipelineFactory(NettyProducer producer) {
+            this.producer = producer;
+        }
 
         @Override
-        public ChannelPipeline getPipeline(NettyProducer producer) throws Exception {
-            invoked = true;
-            
+        public ChannelPipeline getPipeline() throws Exception {
+            clientInvoked = true;
+
             ChannelPipeline channelPipeline = Channels.pipeline();
 
             channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
             channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
-            channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));            
+            channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
             channelPipeline.addLast("handler", new ClientChannelHandler(producer));
 
             return channelPipeline;
         }
-        
-        public boolean isFactoryInvoked() {
-            return invoked;
+
+        @Override
+        public ClientPipelineFactory createPipelineFactory(NettyProducer producer) {
+            return new TestClientChannelPipelineFactory(producer);
         }
     }
-    
+
     public class TestServerChannelPipelineFactory extends ServerPipelineFactory {
         private int maxLineSize = 1024;
-        private boolean invoked;
+        private NettyConsumer consumer;
+
+        public TestServerChannelPipelineFactory(NettyConsumer consumer) {
+            this.consumer = consumer;
+        }
 
         @Override
-        public ChannelPipeline getPipeline(NettyConsumer consumer) throws Exception {
-            invoked = true;
-            
+        public ChannelPipeline getPipeline() throws Exception {
+            serverInvoked = true;
+
             ChannelPipeline channelPipeline = Channels.pipeline();
 
             channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
@@ -125,9 +132,10 @@ public class NettyCustomPipelineFactoryS
 
             return channelPipeline;
         }
-        
-        public boolean isFactoryInvoked() {
-            return invoked;
+
+        @Override
+        public ServerPipelineFactory createPipelineFactory(NettyConsumer consumer) {
+            return new TestServerChannelPipelineFactory(consumer);
         }
     }