You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bv...@apache.org on 2014/05/28 22:29:53 UTC

git commit: CAMEL-7472: SingleUDPNettyServerBootstrapFactory should wait synchronously for the outcome of joining a UDP multicast group. Added a manual test for this UDP multicast scenario verification. Also polished some glitches by NettyUDPAsyncTest an

Repository: camel
Updated Branches:
  refs/heads/master c3758e7e6 -> b94664af1


CAMEL-7472: SingleUDPNettyServerBootstrapFactory should wait synchronously for the outcome of joining a UDP multicast group. Added a manual test for this UDP multicast scenario verification. Also polished some glitches by NettyUDPAsyncTest and tightened up it's asserts a bit.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b94664af
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b94664af
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b94664af

Branch: refs/heads/master
Commit: b94664af145731aa1325433ee41e237417a59757
Parents: c3758e7
Author: Babak Vahdat <bv...@apache.org>
Authored: Wed May 28 22:29:46 2014 +0200
Committer: Babak Vahdat <bv...@apache.org>
Committed: Wed May 28 22:29:46 2014 +0200

----------------------------------------------------------------------
 .../SingleUDPNettyServerBootstrapFactory.java   |  6 +-
 .../component/netty/NettyUDPAsyncTest.java      | 27 ++----
 .../netty/NettyUDPMulticastAsyncTest.java       | 92 ++++++++++++++++++++
 3 files changed, 102 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b94664af/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
index 4807e1e..00b8440 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
@@ -18,8 +18,6 @@ package org.apache.camel.component.netty;
 
 import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -118,7 +116,7 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
         // noop
     }
 
-    protected void startServerBootstrap() throws UnknownHostException, SocketException {
+    protected void startServerBootstrap() throws Exception {
         // create non-shared worker pool
         int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS;
         workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count);
@@ -164,7 +162,7 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
             multicastNetworkInterface = NetworkInterface.getByName(networkInterface);
             ObjectHelper.notNull(multicastNetworkInterface, "No network interface found for '" + networkInterface + "'.");
             LOG.info("ConnectionlessBootstrap joining {}:{} using network interface: {}", new Object[]{configuration.getHost(), configuration.getPort(), multicastNetworkInterface.getName()});
-            datagramChannel.joinGroup(hostAddress, multicastNetworkInterface);
+            datagramChannel.joinGroup(hostAddress, multicastNetworkInterface).syncUninterruptibly();
             allChannels.add(datagramChannel);
         } else {
             LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());

http://git-wip-us.apache.org/repos/asf/camel/blob/b94664af/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java
index 5c52d79..31304e4 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java
@@ -16,37 +16,23 @@
  */
 package org.apache.camel.component.netty;
 
-import java.io.FileInputStream;
-import java.io.InputStream;
+import java.io.File;
 
-import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.converter.IOConverter;
-import org.apache.camel.util.IOHelper;
+
 import org.junit.Test;
 
 public class NettyUDPAsyncTest extends BaseNettyTest {
 
-    @EndpointInject(uri = "mock:result")
-    protected MockEndpoint resultEndpoint;
-
     private void sendFile(String uri) throws Exception {
         template.send(uri, new Processor() {
             public void process(Exchange exchange) throws Exception {
-                // Read from an input stream
-                InputStream is = IOHelper.buffered(new FileInputStream("src/test/resources/test.txt"));
-
-                byte buffer[] = IOConverter.toBytes(is);
-                is.close();
-
-                // Set the property of the charset encoding
-                exchange.setProperty(Exchange.CHARSET_NAME, "UTF-8");
-                Message in = exchange.getIn();
-                in.setBody(buffer);
+                byte[] buffer = exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, new File("src/test/resources/test.txt"));
+                exchange.setProperty(Exchange.CHARSET_NAME, "ASCII");
+                exchange.getIn().setBody(buffer);
             }
         });
     }
@@ -55,7 +41,10 @@ public class NettyUDPAsyncTest extends BaseNettyTest {
     public void testUDPInOnlyWithNettyConsumer() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
+        mock.message(0).body().startsWith("Song Of A Dream".getBytes());
+
         sendFile("netty:udp://localhost:{{port}}?sync=false");
+
         mock.assertIsSatisfied();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b94664af/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPMulticastAsyncTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPMulticastAsyncTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPMulticastAsyncTest.java
new file mode 100644
index 0000000..4e361fc
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPMulticastAsyncTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * To run this test manually through Maven first remove the {@link Ignore}
+ * annotation below, then make sure you've got a Network interface with the name
+ * <code>en0</code> as given by the route below. If this is not the case run
+ * your OS specific command to find out which Network interfaces you've got
+ * supporting IPv4. For example on OS-X you can use the following command for
+ * this:
+ * 
+ * <pre>
+ *   <code>$> ifconfig -a</code>
+ * </pre>
+ * 
+ * Then replace the <code>en0</code> Network interface name below with your own
+ * one. Now running the test manually should succeed (<b>only</b> when using
+ * Java7+):
+ * 
+ * <pre>
+ *   <code>mvn test -Djava.net.preferIPv4Stack=true -Dtest=NettyUDPMulticastAsyncTest</code>
+ * </pre>
+ * 
+ * Note that the usage of JUnit {@link BeforeClass} annotation to achieve the
+ * same effect would not work in this case as at that stage it would be too late
+ * to use {@link System#setProperty(String, String) the Java API} to reach the
+ * same effect. Also setting such a system property through the surefire-plugin
+ * would cause side effect by the other tests of this component.
+ */
+@Ignore("See the Javadoc")
+public class NettyUDPMulticastAsyncTest extends BaseNettyTest {
+
+    private void sendFile(String uri) throws Exception {
+        template.send(uri, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                byte[] buffer = exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, new File("src/test/resources/test.txt"));
+                exchange.setProperty(Exchange.CHARSET_NAME, "ASCII");
+                exchange.getIn().setBody(buffer);
+            }
+        });
+    }
+
+    @Test
+    public void testUDPInOnlyWithNettyConsumer() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.message(0).body().startsWith("Song Of A Dream".getBytes());
+
+        sendFile("netty:udp://224.1.2.3:{{port}}?sync=false");
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("netty:udp://224.1.2.3:{{port}}?sync=false&networkInterface=en0")
+                    .to("mock:result")
+                    .to("log:Message"); 
+            }
+        };
+    }
+
+}