You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/09/14 16:18:51 UTC

svn commit: r996908 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/util/ test/java/org/apache/camel/component/file/

Author: davsclaus
Date: Tue Sep 14 14:18:50 2010
New Revision: 996908

URL: http://svn.apache.org/viewvc?rev=996908&view=rev
Log:
CAMEL-3108: ConsumerTemplate receiveBody will ensure UoW is done before returning body.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=996908&r1=996907&r2=996908&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Tue Sep 14 14:18:50 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.spi.Synchronization;
@@ -410,4 +411,11 @@ public interface Exchange {
      */
     void handoverCompletions(Exchange target);
 
+    /**
+     * Handover all the on completions from this exchange
+     *
+     * @return the on completions
+     */
+    List<Synchronization> handoverCompletions();
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java?rev=996908&r1=996907&r2=996908&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java Tue Sep 14 14:18:50 2010
@@ -16,12 +16,18 @@
  */
 package org.apache.camel.impl;
 
+import java.util.List;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.UnitOfWorkHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
 
@@ -34,6 +40,7 @@ import static org.apache.camel.util.Obje
  */
 public class DefaultConsumerTemplate extends ServiceSupport implements ConsumerTemplate {
 
+    private static final transient Log LOG = LogFactory.getLog(DefaultConsumerTemplate.class);
     private final CamelContext context;
     private ConsumerCache consumerCache;
     private int maximumCacheSize;
@@ -89,8 +96,14 @@ public class DefaultConsumerTemplate ext
     }
 
     public Object receiveBody(String endpointUri) {
+        Object answer = null;
         Exchange exchange = receive(endpointUri);
-        return extractResultBody(exchange);
+        try {
+            answer = extractResultBody(exchange);
+        } finally {
+            doneUoW(exchange);
+        }
+        return answer;
     }
 
     public Object receiveBody(Endpoint endpoint) {
@@ -98,8 +111,14 @@ public class DefaultConsumerTemplate ext
     }
 
     public Object receiveBody(String endpointUri, long timeout) {
+        Object answer = null;
         Exchange exchange = receive(endpointUri, timeout);
-        return extractResultBody(exchange);
+        try {
+            answer = extractResultBody(exchange);
+        } finally {
+            doneUoW(exchange);
+        }
+        return answer;
     }
 
     public Object receiveBody(Endpoint endpoint, long timeout) {
@@ -107,35 +126,65 @@ public class DefaultConsumerTemplate ext
     }
 
     public Object receiveBodyNoWait(String endpointUri) {
+        Object answer = null;
         Exchange exchange = receiveNoWait(endpointUri);
-        return extractResultBody(exchange);
+        try {
+            answer = extractResultBody(exchange);
+        } finally {
+            doneUoW(exchange);
+        }
+        return answer;
     }
 
     public Object receiveBodyNoWait(Endpoint endpoint) {
         return receiveBodyNoWait(endpoint.getEndpointUri());
     }
 
+    @SuppressWarnings("unchecked")
     public <T> T receiveBody(String endpointUri, Class<T> type) {
-        Object body = receiveBody(endpointUri);
-        return context.getTypeConverter().convertTo(type, body);
+        Object answer = null;
+        Exchange exchange = receive(endpointUri);
+        try {
+            answer = extractResultBody(exchange);
+            answer = context.getTypeConverter().convertTo(type, answer);
+        } finally {
+            doneUoW(exchange);
+        }
+        return (T) answer;
     }
 
     public <T> T receiveBody(Endpoint endpoint, Class<T> type) {
         return receiveBody(endpoint.getEndpointUri(), type);
     }
 
+    @SuppressWarnings("unchecked")
     public <T> T receiveBody(String endpointUri, long timeout, Class<T> type) {
-        Object body = receiveBody(endpointUri, timeout);
-        return context.getTypeConverter().convertTo(type, body);
+        Object answer = null;
+        Exchange exchange = receive(endpointUri, timeout);
+        try {
+            answer = extractResultBody(exchange);
+            answer = context.getTypeConverter().convertTo(type, answer);
+        } finally {
+            doneUoW(exchange);
+        }
+        return (T) answer;
     }
 
     public <T> T receiveBody(Endpoint endpoint, long timeout, Class<T> type) {
         return receiveBody(endpoint.getEndpointUri(), timeout, type);
     }
 
+    @SuppressWarnings("unchecked")
     public <T> T receiveBodyNoWait(String endpointUri, Class<T> type) {
-        Object body = receiveBodyNoWait(endpointUri);
-        return context.getTypeConverter().convertTo(type, body);
+        Object answer = null;
+        Exchange exchange = receiveNoWait(endpointUri);
+        try {
+            answer = extractResultBody(exchange);
+            answer = context.getTypeConverter().convertTo(type, answer);
+        } finally {
+            doneUoW(exchange);
+        }
+        return (T) answer;
     }
 
     public <T> T receiveBodyNoWait(Endpoint endpoint, Class<T> type) {
@@ -175,6 +224,22 @@ public class DefaultConsumerTemplate ext
         return answer;
     }
 
+    private static void doneUoW(Exchange exchange) {
+        try {
+            if (exchange.getUnitOfWork() == null) {
+                // handover completions and done them manually to ensure they are being executed
+                List<Synchronization> synchronizations = exchange.handoverCompletions();
+                UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG);
+            } else {
+                // done the unit of work
+                exchange.getUnitOfWork().done(exchange);
+            }
+        } catch (Throwable e) {
+            LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
+                    + ". This exception will be ignored.", e);
+        }
+    }
+
     private ConsumerCache getConsumerCache() {
         if (!isStarted()) {
             throw new IllegalStateException("ConsumerTemplate has not been started");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=996908&r1=996907&r2=996908&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java Tue Sep 14 14:18:50 2010
@@ -356,6 +356,16 @@ public final class DefaultExchange imple
         }
     }
 
+    public List<Synchronization> handoverCompletions() {
+        List<Synchronization> answer = null;
+        if (onCompletions != null) {
+            answer = new ArrayList<Synchronization>(onCompletions);
+            onCompletions.clear();
+            onCompletions = null;
+        }
+        return answer;
+    }
+
     /**
      * Configures the message after it has been set on the exchange
      */

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=996908&r1=996907&r2=996908&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Tue Sep 14 14:18:50 2010
@@ -17,7 +17,6 @@
 package org.apache.camel.impl;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -34,7 +33,7 @@ import org.apache.camel.spi.Synchronizat
 import org.apache.camel.spi.TracedRouteNodes;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.util.EventHelper;
-import org.apache.camel.util.OrderedComparator;
+import org.apache.camel.util.UnitOfWorkHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -170,31 +169,8 @@ public class DefaultUnitOfWork implement
             LOG.warn("Exception occurred during event notification. This exception will be ignored.", e);
         }
 
-        if (synchronizations != null && !synchronizations.isEmpty()) {
-            // reverse so we invoke it FILO style instead of FIFO
-            Collections.reverse(synchronizations);
-            // and honor if any was ordered by sorting it accordingly
-            Collections.sort(synchronizations, new OrderedComparator());
-            // invoke synchronization callbacks
-            for (Synchronization synchronization : synchronizations) {
-                try {
-                    if (failed) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Invoking synchronization.onFailure: " + synchronization + " with " + exchange);
-                        }
-                        synchronization.onFailure(exchange);
-                    } else {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Invoking synchronization.onComplete: " + synchronization + " with " + exchange);
-                        }
-                        synchronization.onComplete(exchange);
-                    }
-                } catch (Exception e) {
-                    // must catch exceptions to ensure all synchronizations have a chance to run
-                    LOG.warn("Exception occurred during onCompletion. This exception will be ignored.", e);
-                }
-            }
-        }
+        // done the synchronizations
+        UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG);
 
         // unregister from inflight registry
         if (exchange.getContext() != null) {

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java?rev=996908&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java Tue Sep 14 14:18:50 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.Synchronization;
+import org.apache.commons.logging.Log;
+
+/**
+ * @version $Revision$
+ */
+public final class UnitOfWorkHelper {
+
+    private UnitOfWorkHelper() {
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void doneSynchronizations(Exchange exchange, List<Synchronization> synchronizations, Log log) {
+        boolean failed = exchange.isFailed();
+
+        if (synchronizations != null && !synchronizations.isEmpty()) {
+            // reverse so we invoke it FILO style instead of FIFO
+            Collections.reverse(synchronizations);
+            // and honor if any was ordered by sorting it accordingly
+            Collections.sort(synchronizations, new OrderedComparator());
+            // invoke synchronization callbacks
+            for (Synchronization synchronization : synchronizations) {
+                try {
+                    if (failed) {
+                        if (log.isTraceEnabled()) {
+                            log.trace("Invoking synchronization.onFailure: " + synchronization + " with " + exchange);
+                        }
+                        synchronization.onFailure(exchange);
+                    } else {
+                        if (log.isTraceEnabled()) {
+                            log.trace("Invoking synchronization.onComplete: " + synchronization + " with " + exchange);
+                        }
+                        synchronization.onComplete(exchange);
+                    }
+                } catch (Exception e) {
+                    // must catch exceptions to ensure all synchronizations have a chance to run
+                    log.warn("Exception occurred during onCompletion. This exception will be ignored.", e);
+                }
+            }
+        }
+    }
+
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateTest.java?rev=996908&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateTest.java Tue Sep 14 14:18:50 2010
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+
+/**
+ * @version $Revision$
+ */
+public class FileConsumerTemplateTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testFileConsumerTemplate() throws Exception {
+        deleteDirectory("target/consumer");
+        template.sendBodyAndHeader("file:target/consumer", "Hello World", Exchange.FILE_NAME, "hello.txt");
+
+        String body = consumer.receiveBody("file:target/consumer?delete=true", 5000, String.class);
+        assertEquals("Hello World", body);
+
+        // file should be deleted
+        File file = new File("target/consumer/hello.txt").getAbsoluteFile();
+        assertFalse("File should be deleted " + file, file.exists());
+    }
+
+}