You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/10 10:48:48 UTC

[1/2] camel git commit: CAMEL-7500: netty producer would in case of redelivery cause x2 tasks to attempt redelivery. Thanks to Bob Browning for reporting and test case.

Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x c70f7fed1 -> ba737e77e


CAMEL-7500: netty producer would in case of redelivery cause x2 tasks to attempt redelivery. Thanks to Bob Browning for reporting and test case.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b2e8d46d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b2e8d46d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b2e8d46d

Branch: refs/heads/camel-2.15.x
Commit: b2e8d46d5ff5a54116309658e438862923420ce6
Parents: c70f7fe
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Jul 10 10:43:07 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 10 10:55:04 2015 +0200

----------------------------------------------------------------------
 .../camel/component/netty4/NettyProducer.java   |   4 +-
 .../netty4/handlers/ClientChannelHandler.java   |   2 -
 .../component/netty4/NettyRedeliveryTest.java   | 219 +++++++++++++++++++
 3 files changed, 220 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b2e8d46d/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index 9bba354..965397f 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -233,9 +233,7 @@ public class NettyProducer extends DefaultAsyncProducer {
             public void operationComplete(ChannelFuture channelFuture) throws Exception {
                 LOG.trace("Operation complete {}", channelFuture);
                 if (!channelFuture.isSuccess()) {
-                    // no success the set the caused exception and signal callback and break
-                    exchange.setException(channelFuture.cause());
-                    producerCallback.done(false);
+                    // no success then exit, (any exception has been handled by ClientChannelHandler#exceptionCaught)
                     return;
                 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b2e8d46d/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index d4651a9..92e9851 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -232,6 +232,4 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
         return state != null ? state.getCallback() : null;
     }
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2e8d46d/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRedeliveryTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRedeliveryTest.java
new file mode 100644
index 0000000..ab394da
--- /dev/null
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRedeliveryTest.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.Deque;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Test the effect of redelivery in association with netty component.
+ */
+public class NettyRedeliveryTest extends CamelTestSupport {
+
+    /**
+     * Body of sufficient size such that it doesn't fit into the TCP buffer and has to be read.
+     */
+    private static final byte[] LARGE_BUFFER_BODY = new byte[1000000];
+
+    /**
+     * Failure will occur with 2 redeliveries however is increasingly more likely the more it retries.
+     */
+    private static final int REDELIVERY_COUNT = 100;
+
+    private ExecutorService listener = Executors.newSingleThreadExecutor();
+
+    @EndpointInject(uri = "mock:exception")
+    private MockEndpoint exception;
+
+    @EndpointInject(uri = "mock:downstream")
+    private MockEndpoint downstream;
+
+    private Deque<Callable<?>> tasks = new LinkedBlockingDeque<Callable<?>>();
+    private int port;
+    private boolean alive = true;
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        // Create a server to attempt to connect to
+        port = createServerSocket(0);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class)
+                        .maximumRedeliveries(REDELIVERY_COUNT)
+                        .retryAttemptedLogLevel(LoggingLevel.INFO)
+                        .retriesExhaustedLogLevel(LoggingLevel.ERROR)
+                        // lets have a little delay so we do async redelivery
+                        .redeliveryDelay(10)
+                        .to("mock:exception")
+                        .handled(true);
+
+                from("direct:start")
+                        .routeId("start")
+                        .to("netty4:tcp://localhost:" + port)
+                        .to("log:downstream")
+                        .to("mock:downstream");
+            }
+        };
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        alive = false;
+        listener.shutdown();
+    }
+
+    @Test
+    public void testExceptionHandler() throws Exception {
+        /*
+         * We should have 0 for this as it should never be successful however it is usual that this actually returns 1.
+         *
+         * This is because two or more threads run concurrently and will setException(null) which is checked during
+         * redelivery to ascertain whether the delivery was successful, this leads to multiple downstream invocations being
+         * possible.
+         */
+        downstream.setExpectedMessageCount(0);
+        downstream.setAssertPeriod(1000);
+
+        exception.setExpectedMessageCount(1);
+
+        sendBody("direct:start", LARGE_BUFFER_BODY);
+
+        exception.assertIsSatisfied();
+
+        // given 100 retries usually yields somewhere around -95
+        // assertEquals(0, context.getInflightRepository().size("start"));
+
+        // Verify the number of tasks submitted - sometimes both callbacks add a task
+        assertEquals(REDELIVERY_COUNT, tasks.size());
+
+        // Verify the downstream completed messages - othertimes one callback gets treated as done
+        downstream.assertIsSatisfied();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        // Override the error handler executor service such that we can track the tasks created
+        CamelContext context = new DefaultCamelContext(createRegistry()) {
+            @Override
+            public ScheduledExecutorService getErrorHandlerExecutorService() {
+                return getScheduledExecutorService();
+            }
+        };
+        return context;
+    }
+
+    private ScheduledExecutorService getScheduledExecutorService() {
+        final ScheduledExecutorService delegate = Executors.newScheduledThreadPool(10);
+        return newProxy(ScheduledExecutorService.class, new InvocationHandler() {
+            @Override
+            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+                if ("submit".equals(method.getName()) || "schedule".equals(method.getName())) {
+                    tasks.add((Callable<?>) args[0]);
+                }
+                return method.invoke(delegate, args);
+            }
+        });
+    }
+
+    private int createServerSocket(int port) throws IOException {
+        final ServerSocket listen = new ServerSocket(port);
+        listen.setSoTimeout(100);
+        listener.execute(new Runnable() {
+
+            private ExecutorService pool = Executors.newCachedThreadPool();
+
+            @Override
+            public void run() {
+                try {
+                    while (alive) {
+                        try {
+                            pool.execute(new ClosingClientRunnable(listen.accept()));
+                        } catch (SocketTimeoutException ignored) {
+                            // Allow the server socket to terminate in a timely fashion
+                        }
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                } finally {
+                    try {
+                        listen.close();
+                    } catch (IOException ignored) {
+                    }
+                }
+            }
+        });
+        return listen.getLocalPort();
+    }
+
+    private static <T> T newProxy(Class<T> interfaceType, InvocationHandler handler) {
+        Object object = Proxy.newProxyInstance(interfaceType.getClassLoader(), new Class<?>[]{interfaceType}, handler);
+        return interfaceType.cast(object);
+    }
+
+    /**
+     * Handler for client connection.
+     */
+    private class ClosingClientRunnable implements Runnable {
+        private final Socket socket;
+
+        public ClosingClientRunnable(Socket socket) {
+            this.socket = socket;
+        }
+
+        @Override
+        public void run() {
+            try {
+                Thread.sleep(10);
+                socket.close();
+            } catch (Throwable e) {
+                throw new RuntimeException(e);
+            } finally {
+                try {
+                    socket.close();
+                } catch (IOException ignored) {
+                }
+            }
+        }
+    }
+
+}


[2/2] camel git commit: CAMEL-7500: netty producer would in case of redelivery cause x2 tasks to attempt redelivery. Thanks to Bob Browning for reporting and test case.

Posted by da...@apache.org.
CAMEL-7500: netty producer would in case of redelivery cause x2 tasks to attempt redelivery. Thanks to Bob Browning for reporting and test case.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ba737e77
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ba737e77
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ba737e77

Branch: refs/heads/camel-2.15.x
Commit: ba737e77eb1c4abf511c66f1ee0914490f98388c
Parents: b2e8d46
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Jul 10 10:54:39 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 10 10:55:12 2015 +0200

----------------------------------------------------------------------
 .../camel/component/netty/NettyProducer.java    |   4 +-
 .../component/netty/NettyRedeliveryTest.java    | 219 +++++++++++++++++++
 .../netty/NettyUdpConnectedSendTest.java        |   8 +-
 3 files changed, 226 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ba737e77/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index ac1ecef..bf72284 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -264,9 +264,7 @@ public class NettyProducer extends DefaultAsyncProducer {
             public void operationComplete(ChannelFuture channelFuture) throws Exception {
                 LOG.trace("Operation complete {}", channelFuture);
                 if (!channelFuture.isSuccess()) {
-                    // no success the set the caused exception and signal callback and break
-                    exchange.setException(channelFuture.getCause());
-                    producerCallback.done(false);
+                    // no success then exit, (any exception has been handled by ClientChannelHandler#exceptionCaught)
                     return;
                 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/ba737e77/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRedeliveryTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRedeliveryTest.java
new file mode 100644
index 0000000..af7ff76
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRedeliveryTest.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.Deque;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Test the effect of redelivery in association with netty component.
+ */
+public class NettyRedeliveryTest extends CamelTestSupport {
+
+    /**
+     * Body of sufficient size such that it doesn't fit into the TCP buffer and has to be read.
+     */
+    private static final byte[] LARGE_BUFFER_BODY = new byte[1000000];
+
+    /**
+     * Failure will occur with 2 redeliveries however is increasingly more likely the more it retries.
+     */
+    private static final int REDELIVERY_COUNT = 100;
+
+    private ExecutorService listener = Executors.newSingleThreadExecutor();
+
+    @EndpointInject(uri = "mock:exception")
+    private MockEndpoint exception;
+
+    @EndpointInject(uri = "mock:downstream")
+    private MockEndpoint downstream;
+
+    private Deque<Callable<?>> tasks = new LinkedBlockingDeque<Callable<?>>();
+    private int port;
+    private boolean alive = true;
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        // Create a server to attempt to connect to
+        port = createServerSocket(0);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(Exception.class)
+                        .maximumRedeliveries(REDELIVERY_COUNT)
+                        .retryAttemptedLogLevel(LoggingLevel.INFO)
+                        .retriesExhaustedLogLevel(LoggingLevel.ERROR)
+                                // lets have a little delay so we do async redelivery
+                        .redeliveryDelay(10)
+                        .to("mock:exception")
+                        .handled(true);
+
+                from("direct:start")
+                        .routeId("start")
+                        .to("netty:tcp://localhost:" + port)
+                        .to("log:downstream")
+                        .to("mock:downstream");
+            }
+        };
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        alive = false;
+        listener.shutdown();
+    }
+
+    @Test
+    public void testExceptionHandler() throws Exception {
+        /*
+         * We should have 0 for this as it should never be successful however it is usual that this actually returns 1.
+         *
+         * This is because two or more threads run concurrently and will setException(null) which is checked during
+         * redelivery to ascertain whether the delivery was successful, this leads to multiple downstream invocations being
+         * possible.
+         */
+        downstream.setExpectedMessageCount(0);
+        downstream.setAssertPeriod(1000);
+
+        exception.setExpectedMessageCount(1);
+
+        sendBody("direct:start", LARGE_BUFFER_BODY);
+
+        exception.assertIsSatisfied();
+
+        // given 100 retries usually yields somewhere around -95
+        // assertEquals(0, context.getInflightRepository().size("start"));
+
+        // Verify the number of tasks submitted - sometimes both callbacks add a task
+        assertEquals(REDELIVERY_COUNT, tasks.size());
+
+        // Verify the downstream completed messages - othertimes one callback gets treated as done
+        downstream.assertIsSatisfied();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        // Override the error handler executor service such that we can track the tasks created
+        CamelContext context = new DefaultCamelContext(createRegistry()) {
+            @Override
+            public ScheduledExecutorService getErrorHandlerExecutorService() {
+                return getScheduledExecutorService();
+            }
+        };
+        return context;
+    }
+
+    private ScheduledExecutorService getScheduledExecutorService() {
+        final ScheduledExecutorService delegate = Executors.newScheduledThreadPool(10);
+        return newProxy(ScheduledExecutorService.class, new InvocationHandler() {
+            @Override
+            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+                if ("submit".equals(method.getName()) || "schedule".equals(method.getName())) {
+                    tasks.add((Callable<?>) args[0]);
+                }
+                return method.invoke(delegate, args);
+            }
+        });
+    }
+
+    private int createServerSocket(int port) throws IOException {
+        final ServerSocket listen = new ServerSocket(port);
+        listen.setSoTimeout(100);
+        listener.execute(new Runnable() {
+
+            private ExecutorService pool = Executors.newCachedThreadPool();
+
+            @Override
+            public void run() {
+                try {
+                    while (alive) {
+                        try {
+                            pool.execute(new ClosingClientRunnable(listen.accept()));
+                        } catch (SocketTimeoutException ignored) {
+                            // Allow the server socket to terminate in a timely fashion
+                        }
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                } finally {
+                    try {
+                        listen.close();
+                    } catch (IOException ignored) {
+                    }
+                }
+            }
+        });
+        return listen.getLocalPort();
+    }
+
+    private static <T> T newProxy(Class<T> interfaceType, InvocationHandler handler) {
+        Object object = Proxy.newProxyInstance(interfaceType.getClassLoader(), new Class<?>[]{interfaceType}, handler);
+        return interfaceType.cast(object);
+    }
+
+    /**
+     * Handler for client connection.
+     */
+    private class ClosingClientRunnable implements Runnable {
+        private final Socket socket;
+
+        public ClosingClientRunnable(Socket socket) {
+            this.socket = socket;
+        }
+
+        @Override
+        public void run() {
+            try {
+                Thread.sleep(10);
+                socket.close();
+            } catch (Throwable e) {
+                throw new RuntimeException(e);
+            } finally {
+                try {
+                    socket.close();
+                } catch (IOException ignored) {
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ba737e77/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java
index bcda6eb..ddd4c0a 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUdpConnectedSendTest.java
@@ -29,9 +29,13 @@ import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
 import org.jboss.netty.handler.codec.string.StringDecoder;
 import org.jboss.netty.util.CharsetUtil;
+import org.junit.FixMethodOrder;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runners.MethodSorters;
 
-
+//We need to run the tests with fix order
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class NettyUdpConnectedSendTest extends BaseNettyTest {
     private static final String SEND_STRING = "***<We all love camel>***";
     private static final int SEND_COUNT = 20;
@@ -49,7 +53,6 @@ public class NettyUdpConnectedSendTest extends BaseNettyTest {
                 return channelPipeline;
             }
         });
-
     }
 
 
@@ -73,6 +76,7 @@ public class NettyUdpConnectedSendTest extends BaseNettyTest {
     }
 
     @Test
+    @Ignore("This test would be failed in JDK7 sometimes")
     public void sendConnectedWithoutReceiver() throws Exception {
         int exceptionCount = 0;
         for (int i = 0; i < SEND_COUNT; ++i) {