You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/09/14 16:18:51 UTC
svn commit: r996908 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/ main/java/org/apache/camel/impl/
main/java/org/apache/camel/util/ test/java/org/apache/camel/component/file/
Author: davsclaus
Date: Tue Sep 14 14:18:50 2010
New Revision: 996908
URL: http://svn.apache.org/viewvc?rev=996908&view=rev
Log:
CAMEL-3108: ConsumerTemplate receiveBody will ensure UoW is done before returning body.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=996908&r1=996907&r2=996908&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Tue Sep 14 14:18:50 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel;
+import java.util.List;
import java.util.Map;
import org.apache.camel.spi.Synchronization;
@@ -410,4 +411,11 @@ public interface Exchange {
*/
void handoverCompletions(Exchange target);
+ /**
+ * Handover all the on completions from this exchange
+ *
+ * @return the on completions
+ */
+ List<Synchronization> handoverCompletions();
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java?rev=996908&r1=996907&r2=996908&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java Tue Sep 14 14:18:50 2010
@@ -16,12 +16,18 @@
*/
package org.apache.camel.impl;
+import java.util.List;
+
import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.UnitOfWorkHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
@@ -34,6 +40,7 @@ import static org.apache.camel.util.Obje
*/
public class DefaultConsumerTemplate extends ServiceSupport implements ConsumerTemplate {
+ private static final transient Log LOG = LogFactory.getLog(DefaultConsumerTemplate.class);
private final CamelContext context;
private ConsumerCache consumerCache;
private int maximumCacheSize;
@@ -89,8 +96,14 @@ public class DefaultConsumerTemplate ext
}
public Object receiveBody(String endpointUri) {
+ Object answer = null;
Exchange exchange = receive(endpointUri);
- return extractResultBody(exchange);
+ try {
+ answer = extractResultBody(exchange);
+ } finally {
+ doneUoW(exchange);
+ }
+ return answer;
}
public Object receiveBody(Endpoint endpoint) {
@@ -98,8 +111,14 @@ public class DefaultConsumerTemplate ext
}
public Object receiveBody(String endpointUri, long timeout) {
+ Object answer = null;
Exchange exchange = receive(endpointUri, timeout);
- return extractResultBody(exchange);
+ try {
+ answer = extractResultBody(exchange);
+ } finally {
+ doneUoW(exchange);
+ }
+ return answer;
}
public Object receiveBody(Endpoint endpoint, long timeout) {
@@ -107,35 +126,65 @@ public class DefaultConsumerTemplate ext
}
public Object receiveBodyNoWait(String endpointUri) {
+ Object answer = null;
Exchange exchange = receiveNoWait(endpointUri);
- return extractResultBody(exchange);
+ try {
+ answer = extractResultBody(exchange);
+ } finally {
+ doneUoW(exchange);
+ }
+ return answer;
}
public Object receiveBodyNoWait(Endpoint endpoint) {
return receiveBodyNoWait(endpoint.getEndpointUri());
}
+ @SuppressWarnings("unchecked")
public <T> T receiveBody(String endpointUri, Class<T> type) {
- Object body = receiveBody(endpointUri);
- return context.getTypeConverter().convertTo(type, body);
+ Object answer = null;
+ Exchange exchange = receive(endpointUri);
+ try {
+ answer = extractResultBody(exchange);
+ answer = context.getTypeConverter().convertTo(type, answer);
+ } finally {
+ doneUoW(exchange);
+ }
+ return (T) answer;
}
public <T> T receiveBody(Endpoint endpoint, Class<T> type) {
return receiveBody(endpoint.getEndpointUri(), type);
}
+ @SuppressWarnings("unchecked")
public <T> T receiveBody(String endpointUri, long timeout, Class<T> type) {
- Object body = receiveBody(endpointUri, timeout);
- return context.getTypeConverter().convertTo(type, body);
+ Object answer = null;
+ Exchange exchange = receive(endpointUri, timeout);
+ try {
+ answer = extractResultBody(exchange);
+ answer = context.getTypeConverter().convertTo(type, answer);
+ } finally {
+ doneUoW(exchange);
+ }
+ return (T) answer;
}
public <T> T receiveBody(Endpoint endpoint, long timeout, Class<T> type) {
return receiveBody(endpoint.getEndpointUri(), timeout, type);
}
+ @SuppressWarnings("unchecked")
public <T> T receiveBodyNoWait(String endpointUri, Class<T> type) {
- Object body = receiveBodyNoWait(endpointUri);
- return context.getTypeConverter().convertTo(type, body);
+ Object answer = null;
+ Exchange exchange = receiveNoWait(endpointUri);
+ try {
+ answer = extractResultBody(exchange);
+ answer = context.getTypeConverter().convertTo(type, answer);
+ } finally {
+ doneUoW(exchange);
+ }
+ return (T) answer;
}
public <T> T receiveBodyNoWait(Endpoint endpoint, Class<T> type) {
@@ -175,6 +224,22 @@ public class DefaultConsumerTemplate ext
return answer;
}
+ private static void doneUoW(Exchange exchange) {
+ try {
+ if (exchange.getUnitOfWork() == null) {
+ // handover completions and done them manually to ensure they are being executed
+ List<Synchronization> synchronizations = exchange.handoverCompletions();
+ UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG);
+ } else {
+ // done the unit of work
+ exchange.getUnitOfWork().done(exchange);
+ }
+ } catch (Throwable e) {
+ LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
+ + ". This exception will be ignored.", e);
+ }
+ }
+
private ConsumerCache getConsumerCache() {
if (!isStarted()) {
throw new IllegalStateException("ConsumerTemplate has not been started");
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=996908&r1=996907&r2=996908&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java Tue Sep 14 14:18:50 2010
@@ -356,6 +356,16 @@ public final class DefaultExchange imple
}
}
+ public List<Synchronization> handoverCompletions() {
+ List<Synchronization> answer = null;
+ if (onCompletions != null) {
+ answer = new ArrayList<Synchronization>(onCompletions);
+ onCompletions.clear();
+ onCompletions = null;
+ }
+ return answer;
+ }
+
/**
* Configures the message after it has been set on the exchange
*/
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=996908&r1=996907&r2=996908&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Tue Sep 14 14:18:50 2010
@@ -17,7 +17,6 @@
package org.apache.camel.impl;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
@@ -34,7 +33,7 @@ import org.apache.camel.spi.Synchronizat
import org.apache.camel.spi.TracedRouteNodes;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.util.EventHelper;
-import org.apache.camel.util.OrderedComparator;
+import org.apache.camel.util.UnitOfWorkHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -170,31 +169,8 @@ public class DefaultUnitOfWork implement
LOG.warn("Exception occurred during event notification. This exception will be ignored.", e);
}
- if (synchronizations != null && !synchronizations.isEmpty()) {
- // reverse so we invoke it FILO style instead of FIFO
- Collections.reverse(synchronizations);
- // and honor if any was ordered by sorting it accordingly
- Collections.sort(synchronizations, new OrderedComparator());
- // invoke synchronization callbacks
- for (Synchronization synchronization : synchronizations) {
- try {
- if (failed) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Invoking synchronization.onFailure: " + synchronization + " with " + exchange);
- }
- synchronization.onFailure(exchange);
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Invoking synchronization.onComplete: " + synchronization + " with " + exchange);
- }
- synchronization.onComplete(exchange);
- }
- } catch (Exception e) {
- // must catch exceptions to ensure all synchronizations have a chance to run
- LOG.warn("Exception occurred during onCompletion. This exception will be ignored.", e);
- }
- }
- }
+ // done the synchronizations
+ UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG);
// unregister from inflight registry
if (exchange.getContext() != null) {
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java?rev=996908&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java Tue Sep 14 14:18:50 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.Synchronization;
+import org.apache.commons.logging.Log;
+
+/**
+ * @version $Revision$
+ */
+public final class UnitOfWorkHelper {
+
+ private UnitOfWorkHelper() {
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void doneSynchronizations(Exchange exchange, List<Synchronization> synchronizations, Log log) {
+ boolean failed = exchange.isFailed();
+
+ if (synchronizations != null && !synchronizations.isEmpty()) {
+ // reverse so we invoke it FILO style instead of FIFO
+ Collections.reverse(synchronizations);
+ // and honor if any was ordered by sorting it accordingly
+ Collections.sort(synchronizations, new OrderedComparator());
+ // invoke synchronization callbacks
+ for (Synchronization synchronization : synchronizations) {
+ try {
+ if (failed) {
+ if (log.isTraceEnabled()) {
+ log.trace("Invoking synchronization.onFailure: " + synchronization + " with " + exchange);
+ }
+ synchronization.onFailure(exchange);
+ } else {
+ if (log.isTraceEnabled()) {
+ log.trace("Invoking synchronization.onComplete: " + synchronization + " with " + exchange);
+ }
+ synchronization.onComplete(exchange);
+ }
+ } catch (Exception e) {
+ // must catch exceptions to ensure all synchronizations have a chance to run
+ log.warn("Exception occurred during onCompletion. This exception will be ignored.", e);
+ }
+ }
+ }
+ }
+
+}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateTest.java?rev=996908&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateTest.java Tue Sep 14 14:18:50 2010
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+
+/**
+ * @version $Revision$
+ */
+public class FileConsumerTemplateTest extends ContextTestSupport {
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testFileConsumerTemplate() throws Exception {
+ deleteDirectory("target/consumer");
+ template.sendBodyAndHeader("file:target/consumer", "Hello World", Exchange.FILE_NAME, "hello.txt");
+
+ String body = consumer.receiveBody("file:target/consumer?delete=true", 5000, String.class);
+ assertEquals("Hello World", body);
+
+ // file should be deleted
+ File file = new File("target/consumer/hello.txt").getAbsoluteFile();
+ assertFalse("File should be deleted " + file, file.exists());
+ }
+
+}