You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/06 12:26:42 UTC

svn commit: r1198338 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/Exchange.java main/java/org/apache/camel/processor/UnitOfWorkProcessor.java test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java

Author: davsclaus
Date: Sun Nov  6 11:26:42 2011
New Revision: 1198338

URL: http://svn.apache.org/viewvc?rev=1198338&view=rev
Log:
CAMEL-4625: Exchange property can control if UoWPorcessor process sync or async. Needed by some consumers to have fine grained control over this.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1198338&r1=1198337&r2=1198338&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Sun Nov  6 11:26:42 2011
@@ -183,7 +183,8 @@ public interface Exchange {
     String TRACE_EVENT_EXCHANGE  = "CamelTraceEventExchange";
     String TRANSFER_ENCODING     = "Transfer-Encoding";
 
-    String UNIT_OF_WORK_EXHAUSTED = "CamelUnitOfWorkExhausted";
+    String UNIT_OF_WORK_EXHAUSTED    = "CamelUnitOfWorkExhausted";
+    String UNIT_OF_WORK_PROCESS_SYNC = "CamelUnitOfWorkProcessSync";
 
     String XSLT_FILE_NAME = "CamelXsltFileName";
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=1198338&r1=1198337&r2=1198338&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java Sun Nov  6 11:26:42 2011
@@ -24,6 +24,7 @@ import org.apache.camel.impl.DefaultUnit
 import org.apache.camel.impl.MDCUnitOfWork;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,34 +98,12 @@ public class UnitOfWorkProcessor extends
                 return true;
             }
 
-            // process the exchange
-            try {
-                return processor.process(exchange, new AsyncCallback() {
-                    public void done(boolean doneSync) {
-                        // Order here matters. We need to complete the callbacks
-                        // since they will likely update the exchange with some final results.
-                        try {
-                            callback.done(doneSync);
-                        } finally {
-                            doneUow(uow, exchange);
-                        }
-                    }
-                });
-            } catch (Throwable e) {
-                LOG.warn("Caught unhandled exception while processing ExchangeId: " + exchange.getExchangeId(), e);
-
-                // fallback and catch any exceptions the process may not have caught
-                // we must ensure to done the UoW in all cases and issue done on the callback
-                exchange.setException(e);
-
-                // Order here matters. We need to complete the callbacks
-                // since they will likely update the exchange with some final results.
-                try {
-                    callback.done(true);
-                } finally {
-                    doneUow(uow, exchange);
-                }
-                return true;
+            Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
+            if (synchronous != null) {
+                // the exchange signalled to process synchronously
+                return processSync(exchange, callback, uow);
+            } else {
+                return processAsync(exchange, callback, uow);
             }
         } else {
             // There was an existing UoW, so we should just pass through..
@@ -133,6 +112,59 @@ public class UnitOfWorkProcessor extends
         }
     }
 
+    protected boolean processSync(final Exchange exchange, final AsyncCallback callback, final UnitOfWork uow) {
+        LOG.trace("Exchange marked UnitOfWork to be processed synchronously: {}", exchange);
+
+        // process the exchange synchronously
+        try {
+            AsyncProcessorHelper.process(processor, exchange);
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
+
+        try {
+            callback.done(true);
+        } finally {
+            doneUow(uow, exchange);
+        }
+
+        return true;
+    }
+
+    protected boolean processAsync(final Exchange exchange, final AsyncCallback callback, final UnitOfWork uow) {
+        LOG.trace("Processing exchange asynchronously: {}", exchange);
+
+        // process the exchange asynchronously
+        try {
+            return processor.process(exchange, new AsyncCallback() {
+                public void done(boolean doneSync) {
+                    // Order here matters. We need to complete the callbacks
+                    // since they will likely update the exchange with some final results.
+                    try {
+                        callback.done(doneSync);
+                    } finally {
+                        doneUow(uow, exchange);
+                    }
+                }
+            });
+        } catch (Throwable e) {
+            LOG.warn("Caught unhandled exception while processing ExchangeId: " + exchange.getExchangeId(), e);
+
+            // fallback and catch any exceptions the process may not have caught
+            // we must ensure to done the UoW in all cases and issue done on the callback
+            exchange.setException(e);
+
+            // Order here matters. We need to complete the callbacks
+            // since they will likely update the exchange with some final results.
+            try {
+                callback.done(true);
+            } finally {
+                doneUow(uow, exchange);
+            }
+            return true;
+        }
+    }
+
     /**
      * Strategy to create the unit of work for the given exchange.
      *

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java?rev=1198338&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/UnitOfWorkSyncProcessTest.java Sun Nov  6 11:26:42 2011
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.support.SynchronizationAdapter;
+
+/**
+ *
+ */
+public class UnitOfWorkSyncProcessTest extends ContextTestSupport {
+
+    private static String consumerThread;
+    private static String afterThread;
+    private static String taskThread;
+    private static String doneThread;
+    private ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+    public void testUnitOfWorkSync() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        assertMockEndpointsSatisfied();
+
+        // should be same thread
+        assertEquals(taskThread, afterThread);
+        // should not be same
+        assertNotSame(doneThread, afterThread);
+        assertNotSame(doneThread, consumerThread);
+        // should be same thread
+        assertEquals(consumerThread, doneThread);
+
+        executorService.shutdownNow();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(new MyEndpoint())
+                    .process(new AsyncProcessor() {
+                        @Override
+                        public boolean process(final Exchange exchange, final AsyncCallback callback) {
+                            executorService.submit(new Runnable() {
+                                @Override
+                                public void run() {
+                                    taskThread = Thread.currentThread().getName();
+                                    try {
+                                        Thread.sleep(100);
+                                    } catch (InterruptedException e) {
+                                        // ignore
+                                    }
+                                    exchange.getIn().setHeader("foo", 123);
+                                    callback.done(false);
+                                }
+                            });
+
+                            return false;
+                        }
+
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // noop
+                        }
+                    })
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            afterThread = Thread.currentThread().getName();
+                        }
+                    })
+                    .to("mock:result");
+            }
+        };
+    }
+
+    private final class MyEndpoint extends DefaultEndpoint {
+
+        @Override
+        public Producer createProducer() throws Exception {
+            // not supported
+            return null;
+        }
+
+        @Override
+        public Consumer createConsumer(Processor processor) throws Exception {
+            return new MyConsumer(this, processor);
+        }
+
+        @Override
+        protected String createEndpointUri() {
+            return "myEndpoint://foo";
+        }
+
+        @Override
+        public boolean isSingleton() {
+            return true;
+        }
+
+    }
+
+    private final class MyConsumer implements Consumer {
+        private Processor processor;
+        private Endpoint endpoint;
+
+        private MyConsumer(Endpoint endpoint, Processor processor) {
+            this.endpoint = endpoint;
+            this.processor = processor;
+        }
+
+        @Override
+        public Endpoint getEndpoint() {
+            return endpoint;
+        }
+
+        @Override
+        public void start() throws Exception {
+            consumerThread = Thread.currentThread().getName();
+
+            Exchange exchange = new DefaultExchange(context);
+            exchange.setProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC, true);
+            exchange.addOnCompletion(new SynchronizationAdapter() {
+                @Override
+                public void onDone(Exchange exchange) {
+                    doneThread = Thread.currentThread().getName();
+                }
+            });
+
+            // just fire the exchange when started
+            processor.process(exchange);
+        }
+
+        @Override
+        public void stop() throws Exception {
+            // noop
+        }
+    }
+}