You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/06 16:26:19 UTC

svn commit: r772276 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/management/ camel-core...

Author: davsclaus
Date: Wed May  6 14:26:10 2009
New Revision: 772276

URL: http://svn.apache.org/viewvc?rev=772276&view=rev
Log:
CAMEL-1587: Using ExecutorServiceHelper to create thread pools with Camel identified thread names.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=772276&r1=772275&r2=772276&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed May  6 14:26:10 2009
@@ -58,14 +58,10 @@
 
     String ROUTE_STOP = "CamelRouteStop";
 
-    /**
-     * @deprecated a new Async API is planned for Camel 2.0
-     */
-    String PROCESSED_SYNC = "CamelProcessedSync";
-
     String REDELIVERED = "CamelRedelivered";
     String REDELIVERY_COUNTER = "CamelRedeliveryCounter";
 
+    String MULTICAST_INDEX = "CamelMulticastIndex";
     String SPLIT_INDEX = "CamelSplitIndex";
     String SPLIT_SIZE = "CamelSplitSize";
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=772276&r1=772275&r2=772276&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Wed May  6 14:26:10 2009
@@ -18,13 +18,12 @@
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -81,16 +80,9 @@
     }
 
     protected void doStart() throws Exception {
-        int concurrentConsumers = endpoint.getConcurrentConsumers();
-        executor = Executors.newFixedThreadPool(concurrentConsumers, new ThreadFactory() {
-        
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, getThreadName(endpoint.getEndpointUri()));
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
-        for (int i = 0; i < concurrentConsumers; i++) {
+        int poolSize = endpoint.getConcurrentConsumers();
+        executor = ExecutorServiceHelper.newFixedThreadPool(poolSize, endpoint.getEndpointUri(), true);
+        for (int i = 0; i < poolSize; i++) {
             executor.execute(this);
         }
         endpoint.onStarted(this);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java?rev=772276&r1=772275&r2=772276&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java Wed May  6 14:26:10 2009
@@ -23,8 +23,6 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
-
 
 /**
  * A grouped exchange that groups together other exchanges, as a holder object.

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=772276&r1=772275&r2=772276&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Wed May  6 14:26:10 2009
@@ -87,15 +87,6 @@
         }
     }
 
-    public static boolean isProcessedSync(Exchange exchange) {
-        Boolean rc = exchange.getProperty(Exchange.PROCESSED_SYNC, Boolean.class);
-        return rc == null ? false : rc;
-    }
-
-    public static void setProcessedSync(Exchange exchange, boolean sync) {
-        exchange.setProperty(Exchange.PROCESSED_SYNC, sync ? Boolean.TRUE : Boolean.FALSE);
-    }
-
     /**
      * Sends an exchange to an endpoint using a supplied
      * {@link Processor} to populate the exchange

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java?rev=772276&r1=772275&r2=772276&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java Wed May  6 14:26:10 2009
@@ -20,7 +20,6 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.camel.CamelException;
 import org.apache.camel.Service;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.util.ObjectHelper;
@@ -33,7 +32,6 @@
  * @version $Revision$
  */
 public abstract class ServiceSupport implements Service {
-    private static int threadCounter;
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final AtomicBoolean starting = new AtomicBoolean(false);
     private final AtomicBoolean stopping = new AtomicBoolean(false);
@@ -156,17 +154,7 @@
 
     protected abstract void doStop() throws Exception;
 
-    /**
-     * Creates a new thread name with the given prefix
-     */
-    protected String getThreadName(String prefix) {
-        return prefix + " thread:" + nextThreadCounter();
-    }
-
-    protected static synchronized int nextThreadCounter() {
-        return ++threadCounter;
-    }
-
+    @SuppressWarnings("unchecked")
     protected void addChildService(Object childService) {
         synchronized (this) {
             if (childServices == null) {
@@ -177,7 +165,7 @@
     }
 
     protected boolean removeChildService(Object childService) {
-        return childServices != null ? childServices.remove(childService) : false;
+        return childServices != null && childServices.remove(childService);
     }
 
     /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java?rev=772276&r1=772275&r2=772276&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java Wed May  6 14:26:10 2009
@@ -43,6 +43,7 @@
 
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.InstrumentationAgent;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.springframework.jmx.export.annotation.AnnotationJmxAttributeSource;
@@ -384,13 +385,7 @@
 
         if (executorService == null) {
             // we only need a single for the JMX connector
-            executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
-                public Thread newThread(Runnable runnable) {
-                    Thread thread = new Thread(runnable, getThreadName("Camel JMXConnector: " + url));
-                    thread.setDaemon(true);
-                    return thread;
-                }
-            });
+            executorService = ExecutorServiceHelper.newSingleThreadExecutor("JMXConnector: " + url, true);
         }
 
         // execute the JMX connector

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=772276&r1=772275&r2=772276&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed May  6 14:26:10 2009
@@ -53,8 +53,7 @@
 
     private static final transient Log LOG = LogFactory.getLog(MulticastProcessor.class);
 
-    // TODO: Add more logging
-    // TODO: Add option to stop if an exception was thrown during processing to break asap
+    // TODO: Add option to stop if an exception was thrown during processing to break asap (future task cancel)
 
     /**
      * Class that represent each step in the multicast route to do
@@ -117,9 +116,9 @@
         final Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
         
         if (isParallelProcessing() && isStreaming()) {
-            doProcessNewParallelStreaming(result, pairs);
+            doProcessParallelStreaming(result, pairs);
         } else if (isParallelProcessing()) {
-            doProcessNewParallel(result, pairs);
+            doProcessParallel(result, pairs);
         } else {
             doProcessSequntiel(result, pairs);
         }
@@ -129,7 +128,7 @@
         }
     }
 
-    protected void doProcessNewParallelStreaming(final AtomicExchange result, Iterable<ProcessorExchangePair> pairs) throws InterruptedException, ExecutionException {
+    protected void doProcessParallelStreaming(final AtomicExchange result, Iterable<ProcessorExchangePair> pairs) throws InterruptedException, ExecutionException {
         // execute tasks in parallel and aggregate in the order they are finished (out of order sequence)
 
         CompletionService<Exchange> completion = new ExecutorCompletionService<Exchange>(executorService);
@@ -147,6 +146,9 @@
                     } catch (Exception e) {
                         subExchange.setException(e);
                     }
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Parallel streaming processing complete for exchange: " + subExchange);
+                    }
                     return subExchange;
                 }
             });
@@ -162,18 +164,14 @@
             }
         }
 
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Done parallel streaming processing " + total + " exchanges");
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Done parallel streaming processing " + total + " exchanges");
         }
     }
 
-    protected void doProcessNewParallel(final AtomicExchange result, Iterable<ProcessorExchangePair> pairs) throws InterruptedException {
+    protected void doProcessParallel(final AtomicExchange result, Iterable<ProcessorExchangePair> pairs) throws InterruptedException {
         // execute tasks in parallel but aggregate in the same order as the tasks was submitted (in order sequence)
 
-        // TODO I wonder if there is a completion servce that can order the take in the same order as the tasks
-        // was submitted, if so we can do aggregate to catch-up while still processing for more performance
-        // this one completes all tasks before doing aggregation
-
         final List<Exchange> ordered = new ArrayList<Exchange>();
         final CountingLatch latch = new CountingLatch();
         int total = 0;
@@ -195,6 +193,9 @@
                     } catch (Exception e) {
                         subExchange.setException(e);
                     }
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Parallel processing complete for exchange: " + subExchange);
+                    }
                     // this task is done so decrement
                     latch.decrement();
                 }
@@ -238,8 +239,8 @@
             total++;
         }
 
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Done sequientel processing " + total + " exchanges");
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Done sequientel processing " + total + " exchanges");
         }
     }
 
@@ -259,8 +260,8 @@
         }
     }
 
-    protected void updateNewExchange(Exchange exchange, int i, Iterable<ProcessorExchangePair> allPairs) {
-        // No updates needed
+    protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs) {
+        exchange.getIn().setHeader(Exchange.MULTICAST_INDEX, index);
     }
 
     protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) {
@@ -314,6 +315,14 @@
         return isParallelProcessing;
     }
 
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
     public List<Processor> next() {
         if (!hasNext()) {
             return null;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=772276&r1=772275&r2=772276&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Wed May  6 14:26:10 2009
@@ -124,6 +124,7 @@
 
     @Override
     protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs) {
+        super.updateNewExchange(exchange, index, allPairs);
         exchange.getIn().setHeader(Exchange.SPLIT_INDEX, index);
         if (allPairs instanceof Collection) {
             exchange.getIn().setHeader(Exchange.SPLIT_SIZE, ((Collection) allPairs).size());

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=772276&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Wed May  6 14:26:10 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Helper for {@link java.util.concurrent.ExecutorService} to construct executors using a thread factory that
+ * create thread names with Camel prefix.
+ *
+ * @version $Revision$
+ */
+public final class ExecutorServiceHelper {
+
+    private static int threadCounter;
+
+    private ExecutorServiceHelper() {
+    }
+
+    /**
+     * Creates a new thread name with the given prefix
+     */
+    protected static String getThreadName(String name) {
+        return "Camel " + name + " thread:" + nextThreadCounter();
+    }
+
+    protected static synchronized int nextThreadCounter() {
+        return ++threadCounter;
+    }
+
+    public static ExecutorService newScheduledThreadPool(final int poolSize, final String name, final boolean daemon) {
+        return Executors.newScheduledThreadPool(poolSize, new ThreadFactory() {
+            public Thread newThread(Runnable r) {
+                Thread answer = new Thread(r, getThreadName(name));
+                answer.setDaemon(daemon);
+                return answer;
+            }
+        });
+    }
+
+    public static ExecutorService newFixedThreadPool(final int poolSize, final String name, final boolean daemon) {
+        return Executors.newFixedThreadPool(poolSize, new ThreadFactory() {
+            public Thread newThread(Runnable r) {
+                Thread answer = new Thread(r, getThreadName(name));
+                answer.setDaemon(daemon);
+                return answer;
+            }
+        });
+    }
+
+    public static ExecutorService newSingleThreadExecutor(final String name, final boolean daemon) {
+        return Executors.newSingleThreadExecutor(new ThreadFactory() {
+            public Thread newThread(Runnable r) {
+                Thread answer = new Thread(r, getThreadName(name));
+                answer.setDaemon(daemon);
+                return answer;
+            }
+        });
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=772276&r1=772275&r2=772276&view=diff
==============================================================================
--- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Wed May  6 14:26:10 2009
@@ -28,8 +28,6 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -37,6 +35,7 @@
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -74,13 +73,7 @@
             inputStream = resolveStreamFromUrl();
         }
 
-        executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, getThreadName(endpoint.getEndpointUri()));
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
+        executor = ExecutorServiceHelper.newSingleThreadExecutor(endpoint.getEndpointUri(), true);
         executor.execute(this);
     }