You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/06 12:26:42 UTC
svn commit: r1198338 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/Exchange.java
main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java
Author: davsclaus
Date: Sun Nov 6 11:26:42 2011
New Revision: 1198338
URL: http://svn.apache.org/viewvc?rev=1198338&view=rev
Log:
CAMEL-4625: Exchange property can control if UoWPorcessor process sync or async. Needed by some consumers to have fine grained control over this.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1198338&r1=1198337&r2=1198338&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Sun Nov 6 11:26:42 2011
@@ -183,7 +183,8 @@ public interface Exchange {
String TRACE_EVENT_EXCHANGE = "CamelTraceEventExchange";
String TRANSFER_ENCODING = "Transfer-Encoding";
- String UNIT_OF_WORK_EXHAUSTED = "CamelUnitOfWorkExhausted";
+ String UNIT_OF_WORK_EXHAUSTED = "CamelUnitOfWorkExhausted";
+ String UNIT_OF_WORK_PROCESS_SYNC = "CamelUnitOfWorkProcessSync";
String XSLT_FILE_NAME = "CamelXsltFileName";
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=1198338&r1=1198337&r2=1198338&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java Sun Nov 6 11:26:42 2011
@@ -24,6 +24,7 @@ import org.apache.camel.impl.DefaultUnit
import org.apache.camel.impl.MDCUnitOfWork;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.util.AsyncProcessorHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,34 +98,12 @@ public class UnitOfWorkProcessor extends
return true;
}
- // process the exchange
- try {
- return processor.process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- // Order here matters. We need to complete the callbacks
- // since they will likely update the exchange with some final results.
- try {
- callback.done(doneSync);
- } finally {
- doneUow(uow, exchange);
- }
- }
- });
- } catch (Throwable e) {
- LOG.warn("Caught unhandled exception while processing ExchangeId: " + exchange.getExchangeId(), e);
-
- // fallback and catch any exceptions the process may not have caught
- // we must ensure to done the UoW in all cases and issue done on the callback
- exchange.setException(e);
-
- // Order here matters. We need to complete the callbacks
- // since they will likely update the exchange with some final results.
- try {
- callback.done(true);
- } finally {
- doneUow(uow, exchange);
- }
- return true;
+ Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
+ if (synchronous != null) {
+ // the exchange signalled to process synchronously
+ return processSync(exchange, callback, uow);
+ } else {
+ return processAsync(exchange, callback, uow);
}
} else {
// There was an existing UoW, so we should just pass through..
@@ -133,6 +112,59 @@ public class UnitOfWorkProcessor extends
}
}
+ protected boolean processSync(final Exchange exchange, final AsyncCallback callback, final UnitOfWork uow) {
+ LOG.trace("Exchange marked UnitOfWork to be processed synchronously: {}", exchange);
+
+ // process the exchange synchronously
+ try {
+ AsyncProcessorHelper.process(processor, exchange);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+
+ try {
+ callback.done(true);
+ } finally {
+ doneUow(uow, exchange);
+ }
+
+ return true;
+ }
+
+ protected boolean processAsync(final Exchange exchange, final AsyncCallback callback, final UnitOfWork uow) {
+ LOG.trace("Processing exchange asynchronously: {}", exchange);
+
+ // process the exchange asynchronously
+ try {
+ return processor.process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // Order here matters. We need to complete the callbacks
+ // since they will likely update the exchange with some final results.
+ try {
+ callback.done(doneSync);
+ } finally {
+ doneUow(uow, exchange);
+ }
+ }
+ });
+ } catch (Throwable e) {
+ LOG.warn("Caught unhandled exception while processing ExchangeId: " + exchange.getExchangeId(), e);
+
+ // fallback and catch any exceptions the process may not have caught
+ // we must ensure to done the UoW in all cases and issue done on the callback
+ exchange.setException(e);
+
+ // Order here matters. We need to complete the callbacks
+ // since they will likely update the exchange with some final results.
+ try {
+ callback.done(true);
+ } finally {
+ doneUow(uow, exchange);
+ }
+ return true;
+ }
+ }
+
/**
* Strategy to create the unit of work for the given exchange.
*
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java?rev=1198338&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java Sun Nov 6 11:26:42 2011
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.support.SynchronizationAdapter;
+
+/**
+ *
+ */
+public class UnitOfWorkSyncProcessTest extends ContextTestSupport {
+
+ private static String consumerThread;
+ private static String afterThread;
+ private static String taskThread;
+ private static String doneThread;
+ private ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ public void testUnitOfWorkSync() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ assertMockEndpointsSatisfied();
+
+ // should be same thread
+ assertEquals(taskThread, afterThread);
+ // should not be same
+ assertNotSame(doneThread, afterThread);
+ assertNotSame(doneThread, consumerThread);
+ // should be same thread
+ assertEquals(consumerThread, doneThread);
+
+ executorService.shutdownNow();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(new MyEndpoint())
+ .process(new AsyncProcessor() {
+ @Override
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ taskThread = Thread.currentThread().getName();
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ exchange.getIn().setHeader("foo", 123);
+ callback.done(false);
+ }
+ });
+
+ return false;
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // noop
+ }
+ })
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ afterThread = Thread.currentThread().getName();
+ }
+ })
+ .to("mock:result");
+ }
+ };
+ }
+
+ private final class MyEndpoint extends DefaultEndpoint {
+
+ @Override
+ public Producer createProducer() throws Exception {
+ // not supported
+ return null;
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ return new MyConsumer(this, processor);
+ }
+
+ @Override
+ protected String createEndpointUri() {
+ return "myEndpoint://foo";
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ }
+
+ private final class MyConsumer implements Consumer {
+ private Processor processor;
+ private Endpoint endpoint;
+
+ private MyConsumer(Endpoint endpoint, Processor processor) {
+ this.endpoint = endpoint;
+ this.processor = processor;
+ }
+
+ @Override
+ public Endpoint getEndpoint() {
+ return endpoint;
+ }
+
+ @Override
+ public void start() throws Exception {
+ consumerThread = Thread.currentThread().getName();
+
+ Exchange exchange = new DefaultExchange(context);
+ exchange.setProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC, true);
+ exchange.addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ doneThread = Thread.currentThread().getName();
+ }
+ });
+
+ // just fire the exchange when started
+ processor.process(exchange);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // noop
+ }
+ }
+}