You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/16 15:22:04 UTC

git commit: CAMEL-6644: netty suspend/resume now unbinds the acceptor.

Updated Branches:
  refs/heads/master b724bcce7 -> 33342ad04


CAMEL-6644: netty suspend/resume now unbinds the acceptor.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/33342ad0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/33342ad0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/33342ad0

Branch: refs/heads/master
Commit: 33342ad04740f678913d44d361cb6532fd4b4393
Parents: b724bcc
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Aug 16 15:17:35 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 16 15:17:35 2013 +0200

----------------------------------------------------------------------
 .../netty/http/NettyHttpConfiguration.java      |  9 +++
 .../component/netty/http/NettyHttpConsumer.java | 22 ++++++-
 .../http/NettyHttpSuspendResume503Test.java     | 69 ++++++++++++++++++++
 .../netty/http/NettyHttpSuspendResumeTest.java  |  5 +-
 .../camel/component/netty/NettyConsumer.java    | 10 +++
 .../netty/NettyServerBootstrapFactory.java      |  4 +-
 .../SingleTCPNettyServerBootstrapFactory.java   | 25 +++++++
 .../SingleUDPNettyServerBootstrapFactory.java   | 23 +++++--
 .../component/netty/NettySuspendResumeTest.java | 61 +++++++++++++++++
 9 files changed, 216 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java
index 94500e9..509ea02 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java
@@ -37,6 +37,7 @@ public class NettyHttpConfiguration extends NettyConfiguration {
     private boolean bridgeEndpoint;
     private String path;
     private boolean disableStreamCache;
+    private boolean send503whenSuspended = true;
 
     public NettyHttpConfiguration() {
         // we need sync=true as http is request/reply by nature
@@ -133,4 +134,12 @@ public class NettyHttpConfiguration extends NettyConfiguration {
     public void setDisableStreamCache(boolean disableStreamCache) {
         this.disableStreamCache = disableStreamCache;
     }
+
+    public boolean isSend503whenSuspended() {
+        return send503whenSuspended;
+    }
+
+    public void setSend503whenSuspended(boolean send503whenSuspended) {
+        this.send503whenSuspended = send503whenSuspended;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
index 4943ac3..02486c4 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
@@ -45,8 +45,6 @@ public class NettyHttpConsumer extends NettyConsumer {
         super.doStart();
         ObjectHelper.notNull(getNettyServerBootstrapFactory(), "HttpServerBootstrapFactory", this);
         getNettyServerBootstrapFactory().addConsumer(this);
-
-
     }
 
     @Override
@@ -54,4 +52,24 @@ public class NettyHttpConsumer extends NettyConsumer {
         getNettyServerBootstrapFactory().removeConsumer(this);
         super.doStop();
     }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        if (getConfiguration().isSend503whenSuspended()) {
+            // noop as the server handler will send back 503 when suspended
+        } else {
+            // will unbind the acceptor
+            super.doSuspend();
+        }
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        if (getConfiguration().isSend503whenSuspended()) {
+            // noop
+        } else {
+            // will resume the acceptor
+            super.doResume();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResume503Test.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResume503Test.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResume503Test.java
new file mode 100644
index 0000000..373e439
--- /dev/null
+++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResume503Test.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class NettyHttpSuspendResume503Test extends BaseNettyTest {
+
+    private String serverUri = "netty-http:http://localhost:" + getPort() + "/cool?disconnect=true";
+
+    @Test
+    public void testNettySuspendResume() throws Exception {
+        context.getShutdownStrategy().setTimeout(50);
+
+        String reply = template.requestBody(serverUri, "World", String.class);
+        assertEquals("Bye World", reply);
+
+        // now suspend netty
+        NettyHttpConsumer consumer = (NettyHttpConsumer) context.getRoute("foo").getConsumer();
+        assertNotNull(consumer);
+
+        // suspend
+        consumer.suspend();
+
+        try {
+            template.requestBody(serverUri, "Moon", String.class);
+            fail("Should throw exception");
+        } catch (Exception e) {
+            NettyHttpOperationFailedException cause = assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause());
+            assertEquals(503, cause.getStatusCode());
+        }
+
+        // resume
+        consumer.resume();
+
+        Thread.sleep(2000);
+
+        // and send request which should be processed
+        reply = template.requestBody(serverUri, "Moon", String.class);
+        assertEquals("Bye Moon", reply);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(serverUri).routeId("foo")
+                    .transform(body().prepend("Bye "));
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java
index 5aa86d7..dbc4893 100644
--- a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java
+++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java
@@ -21,7 +21,7 @@ import org.junit.Test;
 
 public class NettyHttpSuspendResumeTest extends BaseNettyTest {
 
-    private String serverUri = "netty-http:http://localhost:" + getPort() + "/cool";
+    private String serverUri = "netty-http:http://localhost:" + getPort() + "/cool?disconnect=true&send503whenSuspended=false";
 
     @Test
     public void testNettySuspendResume() throws Exception {
@@ -41,8 +41,7 @@ public class NettyHttpSuspendResumeTest extends BaseNettyTest {
             template.requestBody(serverUri, "Moon", String.class);
             fail("Should throw exception");
         } catch (Exception e) {
-            NettyHttpOperationFailedException cause = assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause());
-            assertEquals(503, cause.getStatusCode());
+            assertTrue(e.getCause().getMessage().startsWith("Cannot connect to localhost"));
         }
 
         // resume

http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
index 938ef96..fa8b06a 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
@@ -82,6 +82,16 @@ public class NettyConsumer extends DefaultConsumer {
         super.doStop();
     }
 
+    @Override
+    protected void doSuspend() throws Exception {
+        ServiceHelper.suspendService(nettyServerBootstrapFactory);
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        ServiceHelper.resumeService(nettyServerBootstrapFactory);
+    }
+
     public CamelContext getContext() {
         return context;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java
index 18dfb4b..af28160 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.netty;
 import java.util.concurrent.ThreadFactory;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Service;
+import org.apache.camel.SuspendableService;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 
@@ -30,7 +30,7 @@ import org.jboss.netty.channel.ChannelPipelineFactory;
  * This factory allows for consumers to reuse existing {@link org.jboss.netty.bootstrap.ServerBootstrap} which
  * allows to share the same port for multiple consumers.
  */
-public interface NettyServerBootstrapFactory extends Service {
+public interface NettyServerBootstrapFactory extends SuspendableService {
 
     /**
      * Initializes this <b>non-shared</b> {@link NettyServerBootstrapFactory}.

http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
index 9e06622..97f3395 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
@@ -25,6 +25,7 @@ import org.apache.camel.support.ServiceSupport;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
@@ -97,6 +98,30 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
         stopServerBootstrap();
     }
 
+    @Override
+    protected void doResume() throws Exception {
+        if (channel != null) {
+            LOG.debug("ServerBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
+            ChannelFuture future = channel.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+            future.awaitUninterruptibly();
+            if (!future.isSuccess()) {
+                // if we cannot bind, the re-create channel
+                allChannels.remove(channel);
+                channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+                allChannels.add(channel);
+            }
+        }
+    }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        if (channel != null) {
+            LOG.debug("ServerBootstrap unbinding from {}:{}", configuration.getHost(), configuration.getPort());
+            ChannelFuture future = channel.unbind();
+            future.awaitUninterruptibly();
+        }
+    }
+
     protected void startServerBootstrap() {
         // prefer using explicit configured thread pools
         BossPool bp = configuration.getBossPool();

http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
index d73f67a..20faf1a 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
@@ -57,6 +57,9 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
     private ChannelPipelineFactory pipelineFactory;
     private DatagramChannelFactory datagramChannelFactory;
     private ConnectionlessBootstrap connectionlessBootstrap;
+    private NetworkInterface multicastNetworkInterface;
+    private DatagramChannel datagramChannel;
+    private Channel channel;
     private WorkerPool workerPool;
 
     public SingleUDPNettyServerBootstrapFactory() {
@@ -104,6 +107,16 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
         stopServerBootstrap();
     }
 
+    @Override
+    protected void doResume() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        // noop
+    }
+
     protected void startServerBootstrap() throws UnknownHostException, SocketException {
         // create non-shared worker pool
         int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS;
@@ -145,15 +158,15 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
         IpV4Subnet multicastSubnet = new IpV4Subnet(MULTICAST_SUBNET);
 
         if (multicastSubnet.contains(configuration.getHost())) {
-            DatagramChannel channel = (DatagramChannel)connectionlessBootstrap.bind(hostAddress);
+            datagramChannel = (DatagramChannel)connectionlessBootstrap.bind(hostAddress);
             String networkInterface = configuration.getNetworkInterface() == null ? LOOPBACK_INTERFACE : configuration.getNetworkInterface();
-            NetworkInterface multicastNetworkInterface = NetworkInterface.getByName(networkInterface);
+            multicastNetworkInterface = NetworkInterface.getByName(networkInterface);
             LOG.info("ConnectionlessBootstrap joining {}:{} using network interface: {}", new Object[]{configuration.getHost(), configuration.getPort(), multicastNetworkInterface.getName()});
-            channel.joinGroup(hostAddress, multicastNetworkInterface);
-            allChannels.add(channel);
+            datagramChannel.joinGroup(hostAddress, multicastNetworkInterface);
+            allChannels.add(datagramChannel);
         } else {
             LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
-            Channel channel = connectionlessBootstrap.bind(hostAddress);
+            channel = connectionlessBootstrap.bind(hostAddress);
             allChannels.add(channel);
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySuspendResumeTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySuspendResumeTest.java
new file mode 100644
index 0000000..df66492
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySuspendResumeTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class NettySuspendResumeTest extends BaseNettyTest {
+    
+    @Test
+    public void testSuspendResume() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Camel", "Again");
+
+        String out = template.requestBody("netty:tcp://localhost:{{port}}?sync=true&disconnect=true", "Camel", String.class);
+        assertEquals("Bye Camel", out);
+
+        context.suspendRoute("foo");
+
+        try {
+            template.requestBody("netty:tcp://localhost:{{port}}?sync=true&disconnect=true", "World", String.class);
+            fail("Should not allow connecting as its suspended");
+        } catch (Exception e) {
+            // expected
+        }
+
+        context.resumeRoute("foo");
+
+        out = template.requestBody("netty:tcp://localhost:{{port}}?sync=true&disconnect=true", "Again", String.class);
+        assertEquals("Bye Again", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("netty:tcp://localhost:{{port}}?sync=true").routeId("foo")
+                    .to("log:result")
+                    .to("mock:result")
+                    .transform(body().prepend("Bye "));
+            }
+        };
+    }
+
+}