You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/04/08 15:41:45 UTC

svn commit: r1310999 - in /camel/trunk/components/camel-netty/src: main/java/org/apache/camel/component/netty/ test/java/org/apache/camel/component/netty/

Author: davsclaus
Date: Sun Apr  8 13:41:45 2012
New Revision: 1310999

URL: http://svn.apache.org/viewvc?rev=1310999&view=rev
Log:
CAMEL-4960: Pipeline factories is now stateless and thread-safe.

Modified:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=1310999&r1=1310998&r2=1310999&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java Sun Apr  8 13:41:45 2012
@@ -18,28 +18,28 @@ package org.apache.camel.component.netty
 
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
 
+/**
+ * Factory to create {@link ChannelPipeline} for clients, eg {@link NettyProducer}.
+ * <p/>
+ * Implementators should use implement the {@link #getPipeline(NettyProducer)} method.
+ *
+ * @see ChannelPipelineFactory
+ */
 public abstract class ClientPipelineFactory implements ChannelPipelineFactory {
-    protected NettyProducer producer;
 
     public ClientPipelineFactory() {
     }
 
-    public ClientPipelineFactory(NettyProducer producer) {
-        this.producer = producer;
-    }
-    
-    public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline channelPipeline = Channels.pipeline();
-        return channelPipeline;
-    }
+    /**
+     * Returns a newly created {@link ChannelPipeline}.
+     *
+     * @param producer the netty producer
+     */
+    public abstract ChannelPipeline getPipeline(NettyProducer producer) throws Exception;
 
-    public NettyProducer getProducer() {
-        return producer;
-    }
-
-    public void setProducer(NettyProducer producer) {
-        this.producer = producer;
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+        throw new UnsupportedOperationException("use getPipeline(NettyProducer) instead");
     }
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java?rev=1310999&r1=1310998&r2=1310999&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java Sun Apr  8 13:41:45 2012
@@ -33,15 +33,12 @@ import org.slf4j.LoggerFactory;
 public class DefaultClientPipelineFactory extends ClientPipelineFactory {
     private static final transient Logger LOG = LoggerFactory.getLogger(DefaultClientPipelineFactory.class);
 
-    public DefaultClientPipelineFactory(NettyProducer producer) {
-        super(producer);
-    }
-
-    public ChannelPipeline getPipeline() throws Exception {
+    @Override
+    public ChannelPipeline getPipeline(NettyProducer producer) throws Exception {
         // create a new pipeline
         ChannelPipeline channelPipeline = Channels.pipeline();
 
-        SslHandler sslHandler = configureClientSSLOnDemand();
+        SslHandler sslHandler = configureClientSSLOnDemand(producer);
         if (sslHandler != null) {
             LOG.debug("Client SSL handler configured and added to the ChannelPipeline");
             channelPipeline.addLast("ssl", sslHandler);
@@ -63,7 +60,7 @@ public class DefaultClientPipelineFactor
         return channelPipeline;
     }
 
-    private SslHandler configureClientSSLOnDemand() throws Exception {
+    private SslHandler configureClientSSLOnDemand(NettyProducer producer) throws Exception {
         if (!producer.getConfiguration().isSsl()) {
             return null;
         }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java?rev=1310999&r1=1310998&r2=1310999&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java Sun Apr  8 13:41:45 2012
@@ -17,7 +17,6 @@
 package org.apache.camel.component.netty;
 
 import java.util.List;
-
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
@@ -25,25 +24,20 @@ import org.apache.camel.component.netty.
 import org.apache.camel.component.netty.ssl.SSLEngineFactory;
 import org.jboss.netty.channel.ChannelDownstreamHandler;
 import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.ChannelUpstreamHandler;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DefaultServerPipelineFactory implements ChannelPipelineFactory {
+public class DefaultServerPipelineFactory extends ServerPipelineFactory {
     private static final transient Logger LOG = LoggerFactory.getLogger(DefaultServerPipelineFactory.class);
-    private NettyConsumer consumer;
-        
-    public DefaultServerPipelineFactory(NettyConsumer consumer) {
-        this.consumer = consumer; 
-    }    
 
-    public ChannelPipeline getPipeline() throws Exception {
+    @Override
+    public ChannelPipeline getPipeline(NettyConsumer consumer) throws Exception {
         ChannelPipeline channelPipeline = Channels.pipeline();
 
-        SslHandler sslHandler = configureServerSSLOnDemand();
+        SslHandler sslHandler = configureServerSSLOnDemand(consumer);
         if (sslHandler != null) {
             LOG.debug("Server SSL handler configured and added as an interceptor against the ChannelPipeline");
             channelPipeline.addLast("ssl", sslHandler);            
@@ -64,7 +58,7 @@ public class DefaultServerPipelineFactor
         return channelPipeline;
     }
     
-    private SslHandler configureServerSSLOnDemand() throws Exception {
+    private SslHandler configureServerSSLOnDemand(NettyConsumer consumer) throws Exception {
         if (!consumer.getConfiguration().isSsl()) {
             return null;
         }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java?rev=1310999&r1=1310998&r2=1310999&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java Sun Apr  8 13:41:45 2012
@@ -20,7 +20,11 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 
 /**
- *
+ * Stores state for {@link NettyProducer} when sending messages.
+ * <p/>
+ * This allows the {@link org.apache.camel.component.netty.handlers.ClientChannelHandler} to access
+ * this state, which is needed so we can get hold of the current {@link Exchange} and the
+ * {@link AsyncCallback} so we can continue routing the message in the Camel routing engine.
  */
 public final class NettyCamelState {
 

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1310999&r1=1310998&r2=1310999&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Sun Apr  8 13:41:45 2012
@@ -26,6 +26,7 @@ import org.jboss.netty.bootstrap.Connect
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
@@ -45,6 +46,7 @@ public class NettyConsumer extends Defau
     private DatagramChannelFactory datagramChannelFactory;
     private ServerBootstrap serverBootstrap;
     private ConnectionlessBootstrap connectionlessServerBootstrap;
+    private ServerPipelineFactory pipelineFactory;
     private Channel channel;
     private ExecutorService bossExecutor;
     private ExecutorService workerExecutor;
@@ -63,9 +65,16 @@ public class NettyConsumer extends Defau
 
     @Override
     protected void doStart() throws Exception {
+        super.doStart();
+
         LOG.debug("Netty consumer binding to: {}", configuration.getAddress());
 
-        super.doStart();
+        // setup pipeline factory
+        pipelineFactory = configuration.getServerPipelineFactory();
+        if (pipelineFactory == null) {
+            pipelineFactory = new DefaultServerPipelineFactory();
+        }
+
         if (isTcp()) {
             initializeTCPServerSocketCommunicationLayer();
         } else {
@@ -97,9 +106,9 @@ public class NettyConsumer extends Defau
             context.getExecutorServiceManager().shutdownNow(workerExecutor);
         }
 
-        super.doStop();
-
         LOG.info("Netty consumer unbound from: " + configuration.getAddress());
+
+        super.doStop();
     }
 
     public CamelContext getContext() {
@@ -165,18 +174,16 @@ public class NettyConsumer extends Defau
                                                                configuration.getWorkerCount());
         }
         serverBootstrap = new ServerBootstrap(channelFactory);
-        if (configuration.getServerPipelineFactory() != null) {
-            configuration.getServerPipelineFactory().setConsumer(this);
-            serverBootstrap.setPipelineFactory(configuration.getServerPipelineFactory());
-        } else {
-            serverBootstrap.setPipelineFactory(new DefaultServerPipelineFactory(this));
-        }
         serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
         serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
         serverBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
         serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
         serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
 
+        // must get the pipeline from the factory when opening a new connection
+        ChannelPipeline serverPipeline = pipelineFactory.getPipeline(this);
+        serverBootstrap.setPipeline(serverPipeline);
+
         channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
         // to keep track of all channels in use
         allChannels.add(channel);
@@ -190,12 +197,6 @@ public class NettyConsumer extends Defau
             datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount());
         }
         connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
-        if (configuration.getServerPipelineFactory() != null) {
-            configuration.getServerPipelineFactory().setConsumer(this);
-            connectionlessServerBootstrap.setPipelineFactory(configuration.getServerPipelineFactory());
-        } else {
-            connectionlessServerBootstrap.setPipelineFactory(new DefaultServerPipelineFactory(this));
-        }
         connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
         connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
         connectionlessServerBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
@@ -210,6 +211,10 @@ public class NettyConsumer extends Defau
                 new FixedReceiveBufferSizePredictorFactory(configuration.getReceiveBufferSizePredictor()));
         }
 
+        // must get the pipeline from the factory when opening a new connection
+        ChannelPipeline serverPipeline = pipelineFactory.getPipeline(this);
+        connectionlessServerBootstrap.setPipeline(serverPipeline);
+
         channel = connectionlessServerBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
         // to keep track of all channels in use
         allChannels.add(channel);

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1310999&r1=1310998&r2=1310999&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Sun Apr  8 13:41:45 2012
@@ -54,6 +54,7 @@ public class NettyProducer extends Defau
     private NettyConfiguration configuration;
     private ChannelFactory channelFactory;
     private DatagramChannelFactory datagramChannelFactory;
+    private ClientPipelineFactory pipelineFactory;
     private CamelLogger noReplyLogger;
     private ExecutorService bossExecutor;
     private ExecutorService workerExecutor;
@@ -92,6 +93,12 @@ public class NettyProducer extends Defau
     protected void doStart() throws Exception {
         super.doStart();
 
+        // setup pipeline factory
+        pipelineFactory = configuration.getClientPipelineFactory();
+        if (pipelineFactory == null) {
+            pipelineFactory = new DefaultClientPipelineFactory();
+        }
+
         if (isTcp()) {
             setupTCPCommunication();
         } else {
@@ -260,16 +267,8 @@ public class NettyProducer extends Defau
         ChannelFuture answer;
         ChannelPipeline clientPipeline;
 
-        if (configuration.getClientPipelineFactory() != null) {
-            // initialize user defined client pipeline factory
-            configuration.getClientPipelineFactory().setProducer(this);
-            clientPipeline = configuration.getClientPipelineFactory().getPipeline();
-        } else {
-            // initialize client pipeline factory
-            ClientPipelineFactory clientPipelineFactory = new DefaultClientPipelineFactory(this);
-            // must get the pipeline from the factory when opening a new connection
-            clientPipeline = clientPipelineFactory.getPipeline();
-        }
+        // must get the pipeline from the factory when opening a new connection
+        clientPipeline = pipelineFactory.getPipeline(this);
 
         if (isTcp()) {
             ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
@@ -283,7 +282,6 @@ public class NettyProducer extends Defau
             answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
             return answer;
         } else {
-            // TODO: Is this correct for a UDP client
             ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
             connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
             connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
@@ -295,7 +293,9 @@ public class NettyProducer extends Defau
 
             // set the pipeline on the bootstrap
             connectionlessClientBootstrap.setPipeline(clientPipeline);
-            connectionlessClientBootstrap.bind(new InetSocketAddress(0));
+            // bind and store channel so we can close it when stopping
+            Channel channel = connectionlessClientBootstrap.bind(new InetSocketAddress(0));
+            ALL_CHANNELS.add(channel);
             answer = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
             return answer;
         }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java?rev=1310999&r1=1310998&r2=1310999&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java Sun Apr  8 13:41:45 2012
@@ -18,29 +18,26 @@ package org.apache.camel.component.netty
 
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
 
+/**
+ * Factory to create {@link ChannelPipeline} for clients, eg {@link NettyConsumer}.
+ * <p/>
+ * Implementators should use implement the {@link #getPipeline(NettyConsumer)} method.
+ *
+ * @see ChannelPipelineFactory
+ */
 public abstract class ServerPipelineFactory implements ChannelPipelineFactory {
-    protected NettyConsumer consumer;
-      
-    public ServerPipelineFactory() {
-    }
-    
-    public ServerPipelineFactory(NettyConsumer consumer) {
-        this.consumer = consumer; 
-    }    
 
-    public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline channelPipeline = Channels.pipeline();
-        return channelPipeline;
-    }
+    /**
+     * Returns a newly created {@link ChannelPipeline}.
+     *
+     * @param consumer the netty consumer
+     */
+    public abstract ChannelPipeline getPipeline(NettyConsumer consumer) throws Exception;
 
-    public NettyConsumer getConsumer() {
-        return consumer;
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+        throw new UnsupportedOperationException("use getPipeline(NettyConsumer) instead");
     }
 
-    public void setConsumer(NettyConsumer consumer) {
-        this.consumer = consumer;
-    }
-    
 }

Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java?rev=1310999&r1=1310998&r2=1310999&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java Sun Apr  8 13:41:45 2012
@@ -81,15 +81,16 @@ public class NettyCustomPipelineFactoryA
         context.stop();
         
         assertEquals("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'", response);
-        assertEquals(true, clientPipelineFactory.isfactoryInvoked());
-        assertEquals(true, serverPipelineFactory.isfactoryInvoked());
+        assertEquals(true, clientPipelineFactory.isFactoryInvoked());
+        assertEquals(true, serverPipelineFactory.isFactoryInvoked());
     } 
     
     public class TestClientChannelPipelineFactory extends ClientPipelineFactory {
         private int maxLineSize = 1024;
         private boolean invoked;
-        
-        public ChannelPipeline getPipeline() throws Exception {
+
+        @Override
+        public ChannelPipeline getPipeline(NettyProducer producer) throws Exception {
             invoked = true;
             
             ChannelPipeline channelPipeline = Channels.pipeline();
@@ -102,7 +103,7 @@ public class NettyCustomPipelineFactoryA
             return channelPipeline;
         }
         
-        public boolean isfactoryInvoked() {
+        public boolean isFactoryInvoked() {
             return invoked;
         }
     }
@@ -110,8 +111,9 @@ public class NettyCustomPipelineFactoryA
     public class TestServerChannelPipelineFactory extends ServerPipelineFactory {
         private int maxLineSize = 1024;
         private boolean invoked;
-        
-        public ChannelPipeline getPipeline() throws Exception {
+
+        @Override
+        public ChannelPipeline getPipeline(NettyConsumer consumer) throws Exception {
             invoked = true;
             
             ChannelPipeline channelPipeline = Channels.pipeline();
@@ -124,7 +126,7 @@ public class NettyCustomPipelineFactoryA
             return channelPipeline;
         }
         
-        public boolean isfactoryInvoked() {
+        public boolean isFactoryInvoked() {
             return invoked;
         }
         

Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java?rev=1310999&r1=1310998&r2=1310999&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java Sun Apr  8 13:41:45 2012
@@ -81,15 +81,16 @@ public class NettyCustomPipelineFactoryS
         context.stop();
         
         assertEquals("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'", response);
-        assertEquals(true, clientPipelineFactory.isfactoryInvoked());
-        assertEquals(true, serverPipelineFactory.isfactoryInvoked());
+        assertEquals(true, clientPipelineFactory.isFactoryInvoked());
+        assertEquals(true, serverPipelineFactory.isFactoryInvoked());
     } 
     
     public class TestClientChannelPipelineFactory extends ClientPipelineFactory {
         private int maxLineSize = 1024;
         private boolean invoked;
-        
-        public ChannelPipeline getPipeline() throws Exception {
+
+        @Override
+        public ChannelPipeline getPipeline(NettyProducer producer) throws Exception {
             invoked = true;
             
             ChannelPipeline channelPipeline = Channels.pipeline();
@@ -102,7 +103,7 @@ public class NettyCustomPipelineFactoryS
             return channelPipeline;
         }
         
-        public boolean isfactoryInvoked() {
+        public boolean isFactoryInvoked() {
             return invoked;
         }
     }
@@ -110,8 +111,9 @@ public class NettyCustomPipelineFactoryS
     public class TestServerChannelPipelineFactory extends ServerPipelineFactory {
         private int maxLineSize = 1024;
         private boolean invoked;
-        
-        public ChannelPipeline getPipeline() throws Exception {
+
+        @Override
+        public ChannelPipeline getPipeline(NettyConsumer consumer) throws Exception {
             invoked = true;
             
             ChannelPipeline channelPipeline = Channels.pipeline();
@@ -124,7 +126,7 @@ public class NettyCustomPipelineFactoryS
             return channelPipeline;
         }
         
-        public boolean isfactoryInvoked() {
+        public boolean isFactoryInvoked() {
             return invoked;
         }
     }