You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/11/14 14:07:04 UTC

svn commit: r1034992 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/model/ main/java/org/apache/camel/util/concurrent/ test/java/org/apache/camel/processor/aggregator/ test/java/org/apache/camel/util/concu...

Author: davsclaus
Date: Sun Nov 14 13:07:04 2010
New Revision: 1034992

URL: http://svn.apache.org/viewvc?rev=1034992&view=rev
Log:
CAMEL-3337: Aggregator uses a synchronized thread pool facade to process completed aggrated exchanges using the caller thread. This avoids any intermediate task queue, which could be filled due slow producer, and causing memory issues.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SynchronousExecutorServiceTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
    camel/trunk/camel-core/src/test/resources/log4j.properties

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=1034992&r1=1034991&r2=1034992&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Sun Nov 14 13:07:04 2010
@@ -255,7 +255,7 @@ public class DefaultExecutorServiceStrat
     }
 
     public ExecutorService newSynchronousThreadPool(Object source, String name) {
-        ExecutorService answer = ExecutorServiceHelper.newSynchronousThreadPool(threadNamePattern, name);
+        ExecutorService answer = ExecutorServiceHelper.newSynchronousThreadPool();
         onThreadPoolCreated(answer);
 
         if (LOG.isDebugEnabled()) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1034992&r1=1034991&r2=1034992&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Sun Nov 14 13:07:04 2010
@@ -159,8 +159,8 @@ public class AggregateDefinition extends
                 // we are running in parallel so create a cached thread pool which grows/shrinks automatic
                 executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "Aggregator");
             } else {
-                // use a single threaded if we are not running in parallel
-                executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(this, "Aggregator");
+                // use a synchronous thread pool if we are not running in parallel (will always use caller thread)
+                executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newSynchronousThreadPool(this, "Aggregator");
             }
         }
         AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor, correlation, strategy, executorService);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1034992&r1=1034991&r2=1034992&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Sun Nov 14 13:07:04 2010
@@ -171,17 +171,14 @@ public final class ExecutorServiceHelper
     }
 
     /**
-     * Creates a new synchronous thread pool which executes the task in the caller thread (no task queue)
-     * <p/>
-     * Uses a {@link java.util.concurrent.SynchronousQueue} queue as task queue.
+     * Creates a new synchronous executor service which always executes the task in the call thread
+     * (its just a thread pool facade)
      *
-     * @param pattern      pattern of the thread name
-     * @param name         ${name} in the pattern name
      * @return the created pool
+     * @see org.apache.camel.util.concurrent.SynchronousExecutorService
      */
-    public static ExecutorService newSynchronousThreadPool(final String pattern, final String name) {
-        return ExecutorServiceHelper.newThreadPool(pattern, name, 0, 0, 60,
-                TimeUnit.SECONDS, 0, new ThreadPoolExecutor.CallerRunsPolicy(), true);
+    public static ExecutorService newSynchronousThreadPool() {
+        return new SynchronousExecutorService();
     }
 
     /**

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java?rev=1034992&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SynchronousExecutorService.java Sun Nov 14 13:07:04 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A synchronous {@link java.util.concurrent.ExecutorService} which always invokes
+ * the task in the caller thread (just a thread pool facade).
+ * <p/>
+ * There is no task queue, and no thread pool. The task will thus always be executed
+ * by the caller thread in a fully synchronous method invocation.
+ * <p/>
+ * This implementation is very simple and does not support waiting for tasks to complete during shutdown.
+ *
+ * @version $Revision$
+ */
+public class SynchronousExecutorService extends AbstractExecutorService {
+
+    private volatile boolean shutdown;
+
+    public void shutdown() {
+        shutdown = true;
+    }
+
+    public List<Runnable> shutdownNow() {
+        // not implemented
+        return null;
+    }
+
+    public boolean isShutdown() {
+        return shutdown;
+    }
+
+    public boolean isTerminated() {
+        return shutdown;
+    }
+
+    public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException {
+        // not implemented
+        return true;
+    }
+
+    public void execute(Runnable runnable) {
+        // run the task synchronously
+        runnable.run();
+    }
+
+}

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java?rev=1034992&r1=1034991&r2=1034992&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateParallelProcessingTest.java Sun Nov 14 13:07:04 2010
@@ -37,7 +37,7 @@ public class AggregateParallelProcessing
                 from("direct:start")
                     .aggregate(header("id"), new BodyInAggregatingStrategy())
                         .eagerCheckCompletion().completionPredicate(body().isEqualTo("END")).parallelProcessing()
-                        .to("mock:result");
+                        .to("log:result", "mock:result");
             }
         });
         context.start();
@@ -62,7 +62,7 @@ public class AggregateParallelProcessing
                 from("direct:start")
                     .aggregate(header("id"), new BodyInAggregatingStrategy())
                         .eagerCheckCompletion().completionPredicate(body().isEqualTo("END"))
-                        .to("mock:result");
+                        .to("log:result", "mock:result");
             }
         });
         context.start();

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java?rev=1034992&r1=1034991&r2=1034992&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java Sun Nov 14 13:07:04 2010
@@ -66,7 +66,7 @@ public class ExecutorServiceHelperTest e
     }
 
     public void testNewSynchronousThreadPool() {
-        ExecutorService pool = ExecutorServiceHelper.newSynchronousThreadPool("MyPool ${name}", "foo");
+        ExecutorService pool = ExecutorServiceHelper.newSynchronousThreadPool();
         assertNotNull(pool);
     }
 

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SynchronousExecutorServiceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SynchronousExecutorServiceTest.java?rev=1034992&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SynchronousExecutorServiceTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SynchronousExecutorServiceTest.java Sun Nov 14 13:07:04 2010
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+
+import junit.framework.TestCase;
+
+/**
+ * @version $Revision$
+ */
+public class SynchronousExecutorServiceTest extends TestCase {
+
+    private static boolean invoked;
+    private static String name1;
+    private static String name2;
+
+    public void testSynchronousExecutorService() throws Exception {
+        name1 = Thread.currentThread().getName();
+
+        ExecutorService service = new SynchronousExecutorService();
+        service.execute(new Runnable() {
+            public void run() {
+                invoked = true;
+                name2 = Thread.currentThread().getName();
+            }
+        });
+
+        assertTrue("Should have been invoked", invoked);
+        assertEquals("Should use same thread", name1, name2);
+    }
+
+    public void testSynchronousExecutorServiceShutdown() throws Exception {
+        ExecutorService service = new SynchronousExecutorService();
+        service.execute(new Runnable() {
+            public void run() {
+                invoked = true;
+            }
+        });
+        service.shutdown();
+
+        assertTrue(service.isShutdown());
+        assertTrue(service.isTerminated());
+    }
+}

Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=1034992&r1=1034991&r2=1034992&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Sun Nov 14 13:07:04 2010
@@ -43,6 +43,7 @@ log4j.logger.org.apache.camel.impl.Defau
 #log4j.logger.org.apache.camel.processor.loadbalancer=TRACE
 #log4j.logger.org.apache.camel.processor.Delayer=TRACE
 #log4j.logger.org.apache.camel.processor.Throttler=TRACE
+#log4j.logger.org.apache.camel.processor.aggregate.AggregateProcessor=DEBUG
 #log4j.logger.org.apache.camel.impl=TRACE
 #log4j.logger.org.apache.camel.util.FileUtil=TRACE
 #log4j.logger.org.apache.camel.util.AsyncProcessorHelper=TRACE