You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/09/02 11:13:51 UTC

svn commit: r1164422 - in /camel/branches/camel-2.8.x: ./ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/test/java/org/apache/camel/management/ camel-core/src/test/java/org/apache/camel/processor/async/

Author: davsclaus
Date: Fri Sep  2 09:13:51 2011
New Revision: 1164422

URL: http://svn.apache.org/viewvc?rev=1164422&view=rev
Log:
Merged revisions 1159682 via svnmerge from 
https://svn.apache.org/repos/asf/camel/trunk


Added:
    camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
      - copied unchanged from r1159682, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
Modified:
    camel/branches/camel-2.8.x/   (props changed)
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java
    camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep  2 09:13:51 2011
@@ -1 +1 @@
-/camel/trunk:1150651,1151054,1152170,1155230,1156108,1156260,1156524,1157348,1157798,1157831,1157878,1158153,1159171,1159174,1159457,1159460,1159606,1159867,1160547,1160637,1161010,1161082,1161524,1162309,1162395
+/camel/trunk:1150651,1151054,1152170,1155230,1156108,1156260,1156524,1157348,1157798,1157831,1157878,1158153,1159171,1159174,1159457,1159460,1159606,1159682,1159867,1160547,1160637,1161010,1161082,1161524,1162309,1162395

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=1164422&r1=1164421&r2=1164422&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Fri Sep  2 09:13:51 2011
@@ -259,11 +259,12 @@ public class ProducerCache extends Servi
      * @param producerCallback the producer template callback to be executed
      * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously
      */
-    public boolean doInAsyncProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, AsyncCallback callback, AsyncProducerCallback producerCallback) {
+    public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern,
+                                     final AsyncCallback callback, final AsyncProducerCallback producerCallback) {
         boolean sync = true;
 
         // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
-        Producer producer = doGetProducer(endpoint, true);
+        final Producer producer = doGetProducer(endpoint, true);
 
         if (producer == null) {
             if (isStopped()) {
@@ -274,39 +275,44 @@ public class ProducerCache extends Servi
             }
         }
 
-        StopWatch watch = null;
-        if (exchange != null) {
-            // record timing for sending the exchange using the producer
-            watch = new StopWatch();
-        }
+        // record timing for sending the exchange using the producer
+        final StopWatch watch = exchange != null ? new StopWatch() : null;
 
         try {
             // invoke the callback
             AsyncProcessor asyncProcessor = AsyncProcessorTypeConverter.convert(producer);
-            sync = producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, callback);
+            sync = producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    try {
+                        if (watch != null) {
+                            long timeTaken = watch.stop();
+                            // emit event that the exchange was sent to the endpoint
+                            EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
+                        }
+
+                        if (producer instanceof ServicePoolAware) {
+                            // release back to the pool
+                            pool.release(endpoint, producer);
+                        } else if (!producer.isSingleton()) {
+                            // stop non singleton producers as we should not leak resources
+                            try {
+                                ServiceHelper.stopService(producer);
+                            } catch (Exception e) {
+                                // ignore and continue
+                                LOG.warn("Error stopping producer: " + producer, e);
+                            }
+                        }
+                    } finally {
+                        callback.done(doneSync);
+                    }
+                }
+            });
         } catch (Throwable e) {
             // ensure exceptions is caught and set on the exchange
             if (exchange != null) {
                 exchange.setException(e);
             }
-        } finally {
-            if (exchange != null && exchange.getException() == null) {
-                long timeTaken = watch.stop();
-                // emit event that the exchange was sent to the endpoint
-                EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
-            }
-            if (producer instanceof ServicePoolAware) {
-                // release back to the pool
-                pool.release(endpoint, producer);
-            } else if (!producer.isSingleton()) {
-                // stop non singleton producers as we should not leak resources
-                try {
-                    ServiceHelper.stopService(producer);
-                } catch (Exception e) {
-                    // ignore and continue
-                    LOG.warn("Error stopping producer: " + producer, e);
-                }
-            }
         }
 
         return sync;

Modified: camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java?rev=1164422&r1=1164421&r2=1164422&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java Fri Sep  2 09:13:51 2011
@@ -94,17 +94,19 @@ public class EventNotifierFailureHandled
         assertIsInstanceOf(RouteStartedEvent.class, events.get(1));
         assertIsInstanceOf(CamelContextStartedEvent.class, events.get(2));
         assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(3));
+        assertIsInstanceOf(ExchangeSentEvent.class, events.get(4));
 
-        ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(4));
+        ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
         assertEquals("should be DLC", true, e.isDeadLetterChannel());
         SendProcessor send = assertIsInstanceOf(SendProcessor.class, e.getFailureHandler());
         assertEquals("mock://dead", send.getDestination().getEndpointUri());
 
         // dead letter channel will mark the exchange as completed
-        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(5));
-        // and the sent will be logged after they are complete sending as it record the time taken as well
-        assertIsInstanceOf(ExchangeSentEvent.class, events.get(6));
+        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
+        // and the last event should be the direct:start
         assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
+        ExchangeSentEvent sent = (ExchangeSentEvent) events.get(7);
+        assertEquals("direct://start", sent.getEndpoint().getEndpointUri());
     }
 
     public void testExchangeOnException() throws Exception {
@@ -127,14 +129,17 @@ public class EventNotifierFailureHandled
         assertIsInstanceOf(RouteStartedEvent.class, events.get(1));
         assertIsInstanceOf(CamelContextStartedEvent.class, events.get(2));
         assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(3));
+        assertIsInstanceOf(ExchangeSentEvent.class, events.get(4));
 
-        ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(4));
+        ExchangeFailureHandledEvent e = assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
         assertEquals("should NOT be DLC", false, e.isDeadLetterChannel());
 
         // onException will handle the exception
-        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(5));
-        assertIsInstanceOf(ExchangeSentEvent.class, events.get(6));
+        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
+        // and the last event should be the direct:start
         assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
+        ExchangeSentEvent sent = (ExchangeSentEvent) events.get(7);
+        assertEquals("direct://start", sent.getEndpoint().getEndpointUri());
     }
 
 }

Modified: camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java?rev=1164422&r1=1164421&r2=1164422&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java Fri Sep  2 09:13:51 2011
@@ -100,9 +100,9 @@ public class EventNotifierRedeliveryEven
         assertEquals(3, e.getAttempt());
         e = assertIsInstanceOf(ExchangeRedeliveryEvent.class, events.get(4));
         assertEquals(4, e.getAttempt());
-        assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
-        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
-        assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
+        assertIsInstanceOf(ExchangeSentEvent.class, events.get(5));
+        assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(6));
+        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(7));
         assertIsInstanceOf(ExchangeSentEvent.class, events.get(8));
     }
 
@@ -134,7 +134,6 @@ public class EventNotifierRedeliveryEven
         assertEquals(3, e.getAttempt());
         e = assertIsInstanceOf(ExchangeRedeliveryEvent.class, events.get(4));
         assertEquals(4, e.getAttempt());
-        assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
 
         // since its async the ordering of the rest can be different depending per OS and timing
     }