You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/09 09:12:42 UTC

git commit: CAMEL-6403: DefaultConsumer allows to handle the lifecycle of Exchange UoW, which is needed for HTTP/TCP consumers which can send back very large streaming based replies. As the UoW work should be done after the reply has been sent.

Updated Branches:
  refs/heads/master fa97c17a1 -> 497ea3717


CAMEL-6403: DefaultConsumer allows to handle the lifecycle of Exchange UoW, which is needed for HTTP/TCP consumers which can send back very large streaming based replies. As the UoW work should be done after the reply has been sent.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/497ea371
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/497ea371
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/497ea371

Branch: refs/heads/master
Commit: 497ea3717e596af7a1a753e6458a5c5cd9cc6f6e
Parents: fa97c17
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Aug 8 15:23:03 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 9 07:29:21 2013 +0200

----------------------------------------------------------------------
 .../org/apache/camel/impl/DefaultConsumer.java  | 34 +++++++++
 .../camel/processor/CamelInternalProcessor.java | 39 ++---------
 .../org/apache/camel/util/UnitOfWorkHelper.java | 51 ++++++++++++++
 .../camel/component/http/CamelServlet.java      | 13 +++-
 .../jetty/CamelContinuationServlet.java         | 12 ++++
 .../jetty/HttpStreamCacheFileResponseTest.java  | 72 ++++++++++++++++++++
 .../NettyHttpStreamCacheFileResponseTest.java   | 69 +++++++++++++++++++
 .../netty/handlers/ServerChannelHandler.java    |  7 ++
 8 files changed, 260 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
index 9aa0a73..d8cb30f 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
@@ -19,12 +19,15 @@ package org.apache.camel.impl;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.URISupport;
+import org.apache.camel.util.UnitOfWorkHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +53,37 @@ public class DefaultConsumer extends ServiceSupport implements Consumer {
         return "Consumer[" + URISupport.sanitizeUri(endpoint.getEndpointUri()) + "]";
     }
 
+    /**
+     * If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on
+     * the processed {@link Exchange} then this method should be use to create and start
+     * the {@link UnitOfWork} on the exchange.
+     *
+     * @param exchange the exchange
+     * @return the created and started unit of work
+     * @throws Exception is thrown if error starting the unit of work
+     *
+     * @see #doneUoW(org.apache.camel.Exchange)
+     */
+    public UnitOfWork createUoW(Exchange exchange) throws Exception {
+        UnitOfWork uow = UnitOfWorkHelper.createUoW(exchange);
+        exchange.setUnitOfWork(uow);
+        uow.start();
+        return uow;
+    }
+
+    /**
+     * If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on
+     * the processed {@link Exchange} then this method should be executed when the consumer
+     * is finished processing the message.
+     *
+     * @param exchange the exchange
+     *
+     * @see #createUoW(org.apache.camel.Exchange)
+     */
+    public void doneUoW(Exchange exchange) {
+        UnitOfWorkHelper.doneUow(exchange.getUnitOfWork(), exchange);
+    }
+
     public Endpoint getEndpoint() {
         return endpoint;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 343eed6..5c4ef85 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -32,8 +32,6 @@ import org.apache.camel.StatefulService;
 import org.apache.camel.StreamCache;
 import org.apache.camel.api.management.PerformanceCounter;
 import org.apache.camel.impl.DefaultMessageHistory;
-import org.apache.camel.impl.DefaultUnitOfWork;
-import org.apache.camel.impl.MDCUnitOfWork;
 import org.apache.camel.management.DelegatePerformanceCounter;
 import org.apache.camel.management.mbean.ManagedPerformanceCounter;
 import org.apache.camel.model.ProcessorDefinition;
@@ -48,6 +46,7 @@ import org.apache.camel.spi.StreamCachingStrategy;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.UnitOfWorkHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -598,7 +597,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
             if (exchange.getUnitOfWork() == null) {
                 // If there is no existing UoW, then we should start one and
                 // terminate it once processing is completed for the exchange.
-                final UnitOfWork uow = createUnitOfWork(exchange);
+                UnitOfWork uow = createUnitOfWork(exchange);
                 exchange.setUnitOfWork(uow);
                 uow.start();
                 return uow;
@@ -609,42 +608,14 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
 
         @Override
         public void after(Exchange exchange, UnitOfWork uow) throws Exception {
+            // execute done on uow if we created it, and the consumer is not doing it
             if (uow != null) {
-                doneUow(uow, exchange);
+                UnitOfWorkHelper.doneUow(uow, exchange);
             }
         }
 
         protected UnitOfWork createUnitOfWork(Exchange exchange) {
-            UnitOfWork answer;
-            if (exchange.getContext().isUseMDCLogging()) {
-                answer = new MDCUnitOfWork(exchange);
-            } else {
-                answer = new DefaultUnitOfWork(exchange);
-            }
-            return answer;
-        }
-
-        private void doneUow(UnitOfWork uow, Exchange exchange) {
-            // unit of work is done
-            try {
-                if (uow != null) {
-                    uow.done(exchange);
-                }
-            } catch (Throwable e) {
-                LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
-                        + ". This exception will be ignored.", e);
-            }
-            try {
-                if (uow != null) {
-                    uow.stop();
-                }
-            } catch (Throwable e) {
-                LOG.warn("Exception occurred during stopping UnitOfWork for Exchange: " + exchange
-                        + ". This exception will be ignored.", e);
-            }
-
-            // remove uow from exchange as its done
-            exchange.setUnitOfWork(null);
+            return UnitOfWorkHelper.createUoW(exchange);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java b/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
index 4550ece..835d237 100644
--- a/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
@@ -21,17 +21,68 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultUnitOfWork;
+import org.apache.camel.impl.MDCUnitOfWork;
 import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.UnitOfWork;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @version 
  */
 public final class UnitOfWorkHelper {
 
+    private static final Logger LOG = LoggerFactory.getLogger(UnitOfWorkHelper.class);
+
     private UnitOfWorkHelper() {
     }
 
+    /**
+     * Creates a new {@link UnitOfWork}.
+     *
+     * @param exchange the exchange
+     * @return the created unit of work (is not started)
+     */
+    public static UnitOfWork createUoW(Exchange exchange) {
+        UnitOfWork answer;
+        if (exchange.getContext().isUseMDCLogging()) {
+            answer = new MDCUnitOfWork(exchange);
+        } else {
+            answer = new DefaultUnitOfWork(exchange);
+        }
+        return answer;
+    }
+
+    /**
+     * Done and stop the {@link UnitOfWork}.
+     *
+     * @param uow the unit of work
+     * @param exchange the exchange (will unset the UoW on the exchange)
+     */
+    public static void doneUow(UnitOfWork uow, Exchange exchange) {
+        // unit of work is done
+        try {
+            if (uow != null) {
+                uow.done(exchange);
+            }
+        } catch (Throwable e) {
+            LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
+                    + ". This exception will be ignored.", e);
+        }
+        try {
+            if (uow != null) {
+                uow.stop();
+            }
+        } catch (Throwable e) {
+            LOG.warn("Exception occurred during stopping UnitOfWork for Exchange: " + exchange
+                    + ". This exception will be ignored.", e);
+        }
+
+        // remove uow from exchange as its done
+        exchange.setUnitOfWork(null);
+    }
+
     public static void doneSynchronizations(Exchange exchange, List<Synchronization> synchronizations, Logger log) {
         boolean failed = exchange.isFailed();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/components/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
----------------------------------------------------------------------
diff --git a/components/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java b/components/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
index 7da5f30..823fd0b 100644
--- a/components/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
+++ b/components/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
@@ -86,6 +86,14 @@ public class CamelServlet extends HttpServlet {
         
         // create exchange and set data on it
         Exchange exchange = new DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut);
+        // we want to handle the UoW
+        try {
+            consumer.createUoW(exchange);
+        } catch (Exception e) {
+            log.error("Error processing request", e);
+            throw new ServletException(e);
+        }
+
         if (consumer.getEndpoint().isBridgeEndpoint()) {
             exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
         }
@@ -138,6 +146,7 @@ public class CamelServlet extends HttpServlet {
             log.error("Error processing request", e);
             throw new ServletException(e);
         } finally {
+            consumer.doneUoW(exchange);
             restoreTccl(exchange, oldTccl);
         }
     }
@@ -202,9 +211,7 @@ public class CamelServlet extends HttpServlet {
     }
 
     /**
-     * Restore the Thread Context ClassLoader if the Old TCCL is not null.
-     * @param exchange
-     * @param oldTccl
+     * Restore the Thread Context ClassLoader if the old TCCL is not null.
      */
     protected void restoreTccl(final Exchange exchange, ClassLoader oldTccl) {
         if (oldTccl == null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
----------------------------------------------------------------------
diff --git a/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java b/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
index 056e135..229b7e3 100644
--- a/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
+++ b/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
@@ -31,6 +31,8 @@ import org.apache.camel.component.http.HttpConsumer;
 import org.apache.camel.component.http.HttpMessage;
 import org.apache.camel.component.http.helper.HttpHelper;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.util.UnitOfWorkHelper;
 import org.eclipse.jetty.continuation.Continuation;
 import org.eclipse.jetty.continuation.ContinuationSupport;
 
@@ -100,6 +102,14 @@ public class CamelContinuationServlet extends CamelServlet {
 
             // a new request so create an exchange
             final Exchange exchange = new DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut);
+            // we want to handle the UoW
+            try {
+                consumer.createUoW(exchange);
+            } catch (Exception e) {
+                log.error("Error processing request", e);
+                throw new ServletException(e);
+            }
+
             if (consumer.getEndpoint().isBridgeEndpoint()) {
                 exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
             }
@@ -179,6 +189,8 @@ public class CamelContinuationServlet extends CamelServlet {
         } catch (Exception e) {
             log.error("Error processing request", e);
             throw new ServletException(e);
+        } finally {
+            consumer.doneUoW(result);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileResponseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileResponseTest.java b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileResponseTest.java
new file mode 100644
index 0000000..f9432f3
--- /dev/null
+++ b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileResponseTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jetty;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class HttpStreamCacheFileResponseTest extends BaseJettyTest {
+
+    private String body = "12345678901234567890123456789012345678901234567890";
+    private String body2 = "Bye " + body;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        deleteDirectory("target/cachedir");
+        createDirectory("target/cachedir");
+        super.setUp();
+    }
+
+    @Test
+    public void testStreamCacheToFileShouldBeDeletedInCaseOfResponse() throws Exception {
+        String out = template.requestBody("http://localhost:{{port}}/myserver", body, String.class);
+        assertEquals(body2, out);
+
+        // the temporary files should have been deleted
+        File file = new File("target/cachedir");
+        String[] files = file.list();
+        assertEquals("There should be no files", 0, files.length);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // enable stream caching and use a low threshold so its forced to write to file
+                context.getStreamCachingStrategy().setSpoolDirectory("target/cachedir");
+                context.getStreamCachingStrategy().setSpoolThreshold(16);
+                context.setStreamCaching(true);
+
+                from("jetty://http://localhost:{{port}}/myserver")
+                    // wrap the response in 2 input streams so it will force caching to disk
+                    .transform().constant(new BufferedInputStream(new ByteArrayInputStream(body2.getBytes())))
+                    .to("log:reply");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamCacheFileResponseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamCacheFileResponseTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamCacheFileResponseTest.java
new file mode 100644
index 0000000..d2cda96
--- /dev/null
+++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamCacheFileResponseTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NettyHttpStreamCacheFileResponseTest extends BaseNettyTest {
+
+    private String body = "12345678901234567890123456789012345678901234567890";
+    private String body2 = "Bye " + body;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        deleteDirectory("target/cachedir");
+        createDirectory("target/cachedir");
+        super.setUp();
+    }
+
+    @Test
+    public void testStreamCacheToFileShouldBeDeletedInCaseOfResponse() throws Exception {
+        String out = template.requestBody("http://localhost:{{port}}/myserver", body, String.class);
+        assertEquals(body2, out);
+
+        // the temporary files should have been deleted
+        File file = new File("target/cachedir");
+        String[] files = file.list();
+        assertEquals("There should be no files", 0, files.length);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // enable stream caching and use a low threshold so its forced to write to file
+                context.getStreamCachingStrategy().setSpoolDirectory("target/cachedir");
+                context.getStreamCachingStrategy().setSpoolThreshold(16);
+                context.setStreamCaching(true);
+
+                from("netty-http://http://localhost:{{port}}/myserver")
+                        // wrap the response in 2 input streams so it will force caching to disk
+                        .transform().constant(new BufferedInputStream(new ByteArrayInputStream(body2.getBytes())))
+                        .to("log:reply");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
index 2fa1bd3..6aba7a8 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
@@ -87,6 +87,9 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
 
         // create Exchange and let the consumer process it
         final Exchange exchange = consumer.getEndpoint().createExchange(ctx, messageEvent);
+        // we want to handle the UoW
+        consumer.createUoW(exchange);
+
         if (consumer.getConfiguration().isSync()) {
             exchange.setPattern(ExchangePattern.InOut);
         }
@@ -123,6 +126,8 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
             }
         } catch (Throwable e) {
             consumer.getExceptionHandler().handleException(e);
+        } finally {
+            consumer.doneUoW(exchange);
         }
     }
 
@@ -137,6 +142,8 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
                     }
                 } catch (Throwable e) {
                     consumer.getExceptionHandler().handleException(e);
+                } finally {
+                    consumer.doneUoW(exchange);
                 }
             }
         });