You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/16 15:22:04 UTC
git commit: CAMEL-6644: netty suspend/resume now unbinds the acceptor.
Updated Branches:
refs/heads/master b724bcce7 -> 33342ad04
CAMEL-6644: netty suspend/resume now unbinds the acceptor.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/33342ad0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/33342ad0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/33342ad0
Branch: refs/heads/master
Commit: 33342ad04740f678913d44d361cb6532fd4b4393
Parents: b724bcc
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Aug 16 15:17:35 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 16 15:17:35 2013 +0200
----------------------------------------------------------------------
.../netty/http/NettyHttpConfiguration.java | 9 +++
.../component/netty/http/NettyHttpConsumer.java | 22 ++++++-
.../http/NettyHttpSuspendResume503Test.java | 69 ++++++++++++++++++++
.../netty/http/NettyHttpSuspendResumeTest.java | 5 +-
.../camel/component/netty/NettyConsumer.java | 10 +++
.../netty/NettyServerBootstrapFactory.java | 4 +-
.../SingleTCPNettyServerBootstrapFactory.java | 25 +++++++
.../SingleUDPNettyServerBootstrapFactory.java | 23 +++++--
.../component/netty/NettySuspendResumeTest.java | 61 +++++++++++++++++
9 files changed, 216 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java
index 94500e9..509ea02 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConfiguration.java
@@ -37,6 +37,7 @@ public class NettyHttpConfiguration extends NettyConfiguration {
private boolean bridgeEndpoint;
private String path;
private boolean disableStreamCache;
+ private boolean send503whenSuspended = true;
public NettyHttpConfiguration() {
// we need sync=true as http is request/reply by nature
@@ -133,4 +134,12 @@ public class NettyHttpConfiguration extends NettyConfiguration {
public void setDisableStreamCache(boolean disableStreamCache) {
this.disableStreamCache = disableStreamCache;
}
+
+ public boolean isSend503whenSuspended() {
+ return send503whenSuspended;
+ }
+
+ public void setSend503whenSuspended(boolean send503whenSuspended) {
+ this.send503whenSuspended = send503whenSuspended;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
index 4943ac3..02486c4 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
@@ -45,8 +45,6 @@ public class NettyHttpConsumer extends NettyConsumer {
super.doStart();
ObjectHelper.notNull(getNettyServerBootstrapFactory(), "HttpServerBootstrapFactory", this);
getNettyServerBootstrapFactory().addConsumer(this);
-
-
}
@Override
@@ -54,4 +52,24 @@ public class NettyHttpConsumer extends NettyConsumer {
getNettyServerBootstrapFactory().removeConsumer(this);
super.doStop();
}
+
+ @Override
+ protected void doSuspend() throws Exception {
+ if (getConfiguration().isSend503whenSuspended()) {
+ // noop as the server handler will send back 503 when suspended
+ } else {
+ // will unbind the acceptor
+ super.doSuspend();
+ }
+ }
+
+ @Override
+ protected void doResume() throws Exception {
+ if (getConfiguration().isSend503whenSuspended()) {
+ // noop
+ } else {
+ // will resume the acceptor
+ super.doResume();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResume503Test.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResume503Test.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResume503Test.java
new file mode 100644
index 0000000..373e439
--- /dev/null
+++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResume503Test.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class NettyHttpSuspendResume503Test extends BaseNettyTest {
+
+ private String serverUri = "netty-http:http://localhost:" + getPort() + "/cool?disconnect=true";
+
+ @Test
+ public void testNettySuspendResume() throws Exception {
+ context.getShutdownStrategy().setTimeout(50);
+
+ String reply = template.requestBody(serverUri, "World", String.class);
+ assertEquals("Bye World", reply);
+
+ // now suspend netty
+ NettyHttpConsumer consumer = (NettyHttpConsumer) context.getRoute("foo").getConsumer();
+ assertNotNull(consumer);
+
+ // suspend
+ consumer.suspend();
+
+ try {
+ template.requestBody(serverUri, "Moon", String.class);
+ fail("Should throw exception");
+ } catch (Exception e) {
+ NettyHttpOperationFailedException cause = assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause());
+ assertEquals(503, cause.getStatusCode());
+ }
+
+ // resume
+ consumer.resume();
+
+ Thread.sleep(2000);
+
+ // and send request which should be processed
+ reply = template.requestBody(serverUri, "Moon", String.class);
+ assertEquals("Bye Moon", reply);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(serverUri).routeId("foo")
+ .transform(body().prepend("Bye "));
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java
index 5aa86d7..dbc4893 100644
--- a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java
+++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSuspendResumeTest.java
@@ -21,7 +21,7 @@ import org.junit.Test;
public class NettyHttpSuspendResumeTest extends BaseNettyTest {
- private String serverUri = "netty-http:http://localhost:" + getPort() + "/cool";
+ private String serverUri = "netty-http:http://localhost:" + getPort() + "/cool?disconnect=true&send503whenSuspended=false";
@Test
public void testNettySuspendResume() throws Exception {
@@ -41,8 +41,7 @@ public class NettyHttpSuspendResumeTest extends BaseNettyTest {
template.requestBody(serverUri, "Moon", String.class);
fail("Should throw exception");
} catch (Exception e) {
- NettyHttpOperationFailedException cause = assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause());
- assertEquals(503, cause.getStatusCode());
+ assertTrue(e.getCause().getMessage().startsWith("Cannot connect to localhost"));
}
// resume
http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
index 938ef96..fa8b06a 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
@@ -82,6 +82,16 @@ public class NettyConsumer extends DefaultConsumer {
super.doStop();
}
+ @Override
+ protected void doSuspend() throws Exception {
+ ServiceHelper.suspendService(nettyServerBootstrapFactory);
+ }
+
+ @Override
+ protected void doResume() throws Exception {
+ ServiceHelper.resumeService(nettyServerBootstrapFactory);
+ }
+
public CamelContext getContext() {
return context;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java
index 18dfb4b..af28160 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.netty;
import java.util.concurrent.ThreadFactory;
import org.apache.camel.CamelContext;
-import org.apache.camel.Service;
+import org.apache.camel.SuspendableService;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -30,7 +30,7 @@ import org.jboss.netty.channel.ChannelPipelineFactory;
* This factory allows for consumers to reuse existing {@link org.jboss.netty.bootstrap.ServerBootstrap} which
* allows to share the same port for multiple consumers.
*/
-public interface NettyServerBootstrapFactory extends Service {
+public interface NettyServerBootstrapFactory extends SuspendableService {
/**
* Initializes this <b>non-shared</b> {@link NettyServerBootstrapFactory}.
http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
index 9e06622..97f3395 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
@@ -25,6 +25,7 @@ import org.apache.camel.support.ServiceSupport;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
@@ -97,6 +98,30 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
stopServerBootstrap();
}
+ @Override
+ protected void doResume() throws Exception {
+ if (channel != null) {
+ LOG.debug("ServerBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
+ ChannelFuture future = channel.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ future.awaitUninterruptibly();
+ if (!future.isSuccess()) {
+ // if we cannot bind, the re-create channel
+ allChannels.remove(channel);
+ channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ allChannels.add(channel);
+ }
+ }
+ }
+
+ @Override
+ protected void doSuspend() throws Exception {
+ if (channel != null) {
+ LOG.debug("ServerBootstrap unbinding from {}:{}", configuration.getHost(), configuration.getPort());
+ ChannelFuture future = channel.unbind();
+ future.awaitUninterruptibly();
+ }
+ }
+
protected void startServerBootstrap() {
// prefer using explicit configured thread pools
BossPool bp = configuration.getBossPool();
http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
index d73f67a..20faf1a 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
@@ -57,6 +57,9 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
private ChannelPipelineFactory pipelineFactory;
private DatagramChannelFactory datagramChannelFactory;
private ConnectionlessBootstrap connectionlessBootstrap;
+ private NetworkInterface multicastNetworkInterface;
+ private DatagramChannel datagramChannel;
+ private Channel channel;
private WorkerPool workerPool;
public SingleUDPNettyServerBootstrapFactory() {
@@ -104,6 +107,16 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
stopServerBootstrap();
}
+ @Override
+ protected void doResume() throws Exception {
+ // noop
+ }
+
+ @Override
+ protected void doSuspend() throws Exception {
+ // noop
+ }
+
protected void startServerBootstrap() throws UnknownHostException, SocketException {
// create non-shared worker pool
int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS;
@@ -145,15 +158,15 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
IpV4Subnet multicastSubnet = new IpV4Subnet(MULTICAST_SUBNET);
if (multicastSubnet.contains(configuration.getHost())) {
- DatagramChannel channel = (DatagramChannel)connectionlessBootstrap.bind(hostAddress);
+ datagramChannel = (DatagramChannel)connectionlessBootstrap.bind(hostAddress);
String networkInterface = configuration.getNetworkInterface() == null ? LOOPBACK_INTERFACE : configuration.getNetworkInterface();
- NetworkInterface multicastNetworkInterface = NetworkInterface.getByName(networkInterface);
+ multicastNetworkInterface = NetworkInterface.getByName(networkInterface);
LOG.info("ConnectionlessBootstrap joining {}:{} using network interface: {}", new Object[]{configuration.getHost(), configuration.getPort(), multicastNetworkInterface.getName()});
- channel.joinGroup(hostAddress, multicastNetworkInterface);
- allChannels.add(channel);
+ datagramChannel.joinGroup(hostAddress, multicastNetworkInterface);
+ allChannels.add(datagramChannel);
} else {
LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
- Channel channel = connectionlessBootstrap.bind(hostAddress);
+ channel = connectionlessBootstrap.bind(hostAddress);
allChannels.add(channel);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/33342ad0/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySuspendResumeTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySuspendResumeTest.java
new file mode 100644
index 0000000..df66492
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySuspendResumeTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class NettySuspendResumeTest extends BaseNettyTest {
+
+ @Test
+ public void testSuspendResume() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Camel", "Again");
+
+ String out = template.requestBody("netty:tcp://localhost:{{port}}?sync=true&disconnect=true", "Camel", String.class);
+ assertEquals("Bye Camel", out);
+
+ context.suspendRoute("foo");
+
+ try {
+ template.requestBody("netty:tcp://localhost:{{port}}?sync=true&disconnect=true", "World", String.class);
+ fail("Should not allow connecting as its suspended");
+ } catch (Exception e) {
+ // expected
+ }
+
+ context.resumeRoute("foo");
+
+ out = template.requestBody("netty:tcp://localhost:{{port}}?sync=true&disconnect=true", "Again", String.class);
+ assertEquals("Bye Again", out);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("netty:tcp://localhost:{{port}}?sync=true").routeId("foo")
+ .to("log:result")
+ .to("mock:result")
+ .transform(body().prepend("Bye "));
+ }
+ };
+ }
+
+}