You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/09 09:12:42 UTC
git commit: CAMEL-6403: DefaultConsumer allows to handle the
lifecycle of Exchange UoW,
which is needed for HTTP/TCP consumers which can send back very large
streaming based replies. As the UoW work should be done after the reply has
been sent.
Updated Branches:
refs/heads/master fa97c17a1 -> 497ea3717
CAMEL-6403: DefaultConsumer allows to handle the lifecycle of Exchange UoW, which is needed for HTTP/TCP consumers which can send back very large streaming based replies. As the UoW work should be done after the reply has been sent.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/497ea371
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/497ea371
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/497ea371
Branch: refs/heads/master
Commit: 497ea3717e596af7a1a753e6458a5c5cd9cc6f6e
Parents: fa97c17
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Aug 8 15:23:03 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 9 07:29:21 2013 +0200
----------------------------------------------------------------------
.../org/apache/camel/impl/DefaultConsumer.java | 34 +++++++++
.../camel/processor/CamelInternalProcessor.java | 39 ++---------
.../org/apache/camel/util/UnitOfWorkHelper.java | 51 ++++++++++++++
.../camel/component/http/CamelServlet.java | 13 +++-
.../jetty/CamelContinuationServlet.java | 12 ++++
.../jetty/HttpStreamCacheFileResponseTest.java | 72 ++++++++++++++++++++
.../NettyHttpStreamCacheFileResponseTest.java | 69 +++++++++++++++++++
.../netty/handlers/ServerChannelHandler.java | 7 ++
8 files changed, 260 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
index 9aa0a73..d8cb30f 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
@@ -19,12 +19,15 @@ package org.apache.camel.impl;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.URISupport;
+import org.apache.camel.util.UnitOfWorkHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +53,37 @@ public class DefaultConsumer extends ServiceSupport implements Consumer {
return "Consumer[" + URISupport.sanitizeUri(endpoint.getEndpointUri()) + "]";
}
+ /**
+ * If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on
+ * the processed {@link Exchange} then this method should be use to create and start
+ * the {@link UnitOfWork} on the exchange.
+ *
+ * @param exchange the exchange
+ * @return the created and started unit of work
+ * @throws Exception is thrown if error starting the unit of work
+ *
+ * @see #doneUoW(org.apache.camel.Exchange)
+ */
+ public UnitOfWork createUoW(Exchange exchange) throws Exception {
+ UnitOfWork uow = UnitOfWorkHelper.createUoW(exchange);
+ exchange.setUnitOfWork(uow);
+ uow.start();
+ return uow;
+ }
+
+ /**
+ * If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on
+ * the processed {@link Exchange} then this method should be executed when the consumer
+ * is finished processing the message.
+ *
+ * @param exchange the exchange
+ *
+ * @see #createUoW(org.apache.camel.Exchange)
+ */
+ public void doneUoW(Exchange exchange) {
+ UnitOfWorkHelper.doneUow(exchange.getUnitOfWork(), exchange);
+ }
+
public Endpoint getEndpoint() {
return endpoint;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 343eed6..5c4ef85 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -32,8 +32,6 @@ import org.apache.camel.StatefulService;
import org.apache.camel.StreamCache;
import org.apache.camel.api.management.PerformanceCounter;
import org.apache.camel.impl.DefaultMessageHistory;
-import org.apache.camel.impl.DefaultUnitOfWork;
-import org.apache.camel.impl.MDCUnitOfWork;
import org.apache.camel.management.DelegatePerformanceCounter;
import org.apache.camel.management.mbean.ManagedPerformanceCounter;
import org.apache.camel.model.ProcessorDefinition;
@@ -48,6 +46,7 @@ import org.apache.camel.spi.StreamCachingStrategy;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.UnitOfWorkHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -598,7 +597,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
if (exchange.getUnitOfWork() == null) {
// If there is no existing UoW, then we should start one and
// terminate it once processing is completed for the exchange.
- final UnitOfWork uow = createUnitOfWork(exchange);
+ UnitOfWork uow = createUnitOfWork(exchange);
exchange.setUnitOfWork(uow);
uow.start();
return uow;
@@ -609,42 +608,14 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
@Override
public void after(Exchange exchange, UnitOfWork uow) throws Exception {
+ // execute done on uow if we created it, and the consumer is not doing it
if (uow != null) {
- doneUow(uow, exchange);
+ UnitOfWorkHelper.doneUow(uow, exchange);
}
}
protected UnitOfWork createUnitOfWork(Exchange exchange) {
- UnitOfWork answer;
- if (exchange.getContext().isUseMDCLogging()) {
- answer = new MDCUnitOfWork(exchange);
- } else {
- answer = new DefaultUnitOfWork(exchange);
- }
- return answer;
- }
-
- private void doneUow(UnitOfWork uow, Exchange exchange) {
- // unit of work is done
- try {
- if (uow != null) {
- uow.done(exchange);
- }
- } catch (Throwable e) {
- LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
- + ". This exception will be ignored.", e);
- }
- try {
- if (uow != null) {
- uow.stop();
- }
- } catch (Throwable e) {
- LOG.warn("Exception occurred during stopping UnitOfWork for Exchange: " + exchange
- + ". This exception will be ignored.", e);
- }
-
- // remove uow from exchange as its done
- exchange.setUnitOfWork(null);
+ return UnitOfWorkHelper.createUoW(exchange);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java b/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
index 4550ece..835d237 100644
--- a/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
@@ -21,17 +21,68 @@ import java.util.Collections;
import java.util.List;
import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultUnitOfWork;
+import org.apache.camel.impl.MDCUnitOfWork;
import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.UnitOfWork;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @version
*/
public final class UnitOfWorkHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(UnitOfWorkHelper.class);
+
private UnitOfWorkHelper() {
}
+ /**
+ * Creates a new {@link UnitOfWork}.
+ *
+ * @param exchange the exchange
+ * @return the created unit of work (is not started)
+ */
+ public static UnitOfWork createUoW(Exchange exchange) {
+ UnitOfWork answer;
+ if (exchange.getContext().isUseMDCLogging()) {
+ answer = new MDCUnitOfWork(exchange);
+ } else {
+ answer = new DefaultUnitOfWork(exchange);
+ }
+ return answer;
+ }
+
+ /**
+ * Done and stop the {@link UnitOfWork}.
+ *
+ * @param uow the unit of work
+ * @param exchange the exchange (will unset the UoW on the exchange)
+ */
+ public static void doneUow(UnitOfWork uow, Exchange exchange) {
+ // unit of work is done
+ try {
+ if (uow != null) {
+ uow.done(exchange);
+ }
+ } catch (Throwable e) {
+ LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
+ + ". This exception will be ignored.", e);
+ }
+ try {
+ if (uow != null) {
+ uow.stop();
+ }
+ } catch (Throwable e) {
+ LOG.warn("Exception occurred during stopping UnitOfWork for Exchange: " + exchange
+ + ". This exception will be ignored.", e);
+ }
+
+ // remove uow from exchange as its done
+ exchange.setUnitOfWork(null);
+ }
+
public static void doneSynchronizations(Exchange exchange, List<Synchronization> synchronizations, Logger log) {
boolean failed = exchange.isFailed();
http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/components/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
----------------------------------------------------------------------
diff --git a/components/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java b/components/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
index 7da5f30..823fd0b 100644
--- a/components/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
+++ b/components/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
@@ -86,6 +86,14 @@ public class CamelServlet extends HttpServlet {
// create exchange and set data on it
Exchange exchange = new DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut);
+ // we want to handle the UoW
+ try {
+ consumer.createUoW(exchange);
+ } catch (Exception e) {
+ log.error("Error processing request", e);
+ throw new ServletException(e);
+ }
+
if (consumer.getEndpoint().isBridgeEndpoint()) {
exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
}
@@ -138,6 +146,7 @@ public class CamelServlet extends HttpServlet {
log.error("Error processing request", e);
throw new ServletException(e);
} finally {
+ consumer.doneUoW(exchange);
restoreTccl(exchange, oldTccl);
}
}
@@ -202,9 +211,7 @@ public class CamelServlet extends HttpServlet {
}
/**
- * Restore the Thread Context ClassLoader if the Old TCCL is not null.
- * @param exchange
- * @param oldTccl
+ * Restore the Thread Context ClassLoader if the old TCCL is not null.
*/
protected void restoreTccl(final Exchange exchange, ClassLoader oldTccl) {
if (oldTccl == null) {
http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
----------------------------------------------------------------------
diff --git a/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java b/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
index 056e135..229b7e3 100644
--- a/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
+++ b/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
@@ -31,6 +31,8 @@ import org.apache.camel.component.http.HttpConsumer;
import org.apache.camel.component.http.HttpMessage;
import org.apache.camel.component.http.helper.HttpHelper;
import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.util.UnitOfWorkHelper;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;
@@ -100,6 +102,14 @@ public class CamelContinuationServlet extends CamelServlet {
// a new request so create an exchange
final Exchange exchange = new DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut);
+ // we want to handle the UoW
+ try {
+ consumer.createUoW(exchange);
+ } catch (Exception e) {
+ log.error("Error processing request", e);
+ throw new ServletException(e);
+ }
+
if (consumer.getEndpoint().isBridgeEndpoint()) {
exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
}
@@ -179,6 +189,8 @@ public class CamelContinuationServlet extends CamelServlet {
} catch (Exception e) {
log.error("Error processing request", e);
throw new ServletException(e);
+ } finally {
+ consumer.doneUoW(result);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileResponseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileResponseTest.java b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileResponseTest.java
new file mode 100644
index 0000000..f9432f3
--- /dev/null
+++ b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileResponseTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jetty;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class HttpStreamCacheFileResponseTest extends BaseJettyTest {
+
+ private String body = "12345678901234567890123456789012345678901234567890";
+ private String body2 = "Bye " + body;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ deleteDirectory("target/cachedir");
+ createDirectory("target/cachedir");
+ super.setUp();
+ }
+
+ @Test
+ public void testStreamCacheToFileShouldBeDeletedInCaseOfResponse() throws Exception {
+ String out = template.requestBody("http://localhost:{{port}}/myserver", body, String.class);
+ assertEquals(body2, out);
+
+ // the temporary files should have been deleted
+ File file = new File("target/cachedir");
+ String[] files = file.list();
+ assertEquals("There should be no files", 0, files.length);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // enable stream caching and use a low threshold so its forced to write to file
+ context.getStreamCachingStrategy().setSpoolDirectory("target/cachedir");
+ context.getStreamCachingStrategy().setSpoolThreshold(16);
+ context.setStreamCaching(true);
+
+ from("jetty://http://localhost:{{port}}/myserver")
+ // wrap the response in 2 input streams so it will force caching to disk
+ .transform().constant(new BufferedInputStream(new ByteArrayInputStream(body2.getBytes())))
+ .to("log:reply");
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamCacheFileResponseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamCacheFileResponseTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamCacheFileResponseTest.java
new file mode 100644
index 0000000..d2cda96
--- /dev/null
+++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpStreamCacheFileResponseTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NettyHttpStreamCacheFileResponseTest extends BaseNettyTest {
+
+ private String body = "12345678901234567890123456789012345678901234567890";
+ private String body2 = "Bye " + body;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ deleteDirectory("target/cachedir");
+ createDirectory("target/cachedir");
+ super.setUp();
+ }
+
+ @Test
+ public void testStreamCacheToFileShouldBeDeletedInCaseOfResponse() throws Exception {
+ String out = template.requestBody("http://localhost:{{port}}/myserver", body, String.class);
+ assertEquals(body2, out);
+
+ // the temporary files should have been deleted
+ File file = new File("target/cachedir");
+ String[] files = file.list();
+ assertEquals("There should be no files", 0, files.length);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // enable stream caching and use a low threshold so its forced to write to file
+ context.getStreamCachingStrategy().setSpoolDirectory("target/cachedir");
+ context.getStreamCachingStrategy().setSpoolThreshold(16);
+ context.setStreamCaching(true);
+
+ from("netty-http://http://localhost:{{port}}/myserver")
+ // wrap the response in 2 input streams so it will force caching to disk
+ .transform().constant(new BufferedInputStream(new ByteArrayInputStream(body2.getBytes())))
+ .to("log:reply");
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/497ea371/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
index 2fa1bd3..6aba7a8 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
@@ -87,6 +87,9 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
// create Exchange and let the consumer process it
final Exchange exchange = consumer.getEndpoint().createExchange(ctx, messageEvent);
+ // we want to handle the UoW
+ consumer.createUoW(exchange);
+
if (consumer.getConfiguration().isSync()) {
exchange.setPattern(ExchangePattern.InOut);
}
@@ -123,6 +126,8 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
}
} catch (Throwable e) {
consumer.getExceptionHandler().handleException(e);
+ } finally {
+ consumer.doneUoW(exchange);
}
}
@@ -137,6 +142,8 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
}
} catch (Throwable e) {
consumer.getExceptionHandler().handleException(e);
+ } finally {
+ consumer.doneUoW(exchange);
}
}
});