You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/03/18 11:47:50 UTC

svn commit: r755541 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/management/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/resequencer/ test/java/org/apache/camel/processor/

Author: davsclaus
Date: Wed Mar 18 10:47:49 2009
New Revision: 755541

URL: http://svn.apache.org/viewvc?rev=755541&view=rev
Log:
Using ExecutorService for JMXConnector thread. Background threads is now daemon.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/NormalizerTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java?rev=755541&r1=755540&r2=755541&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java Wed Mar 18 10:47:49 2009
@@ -25,7 +25,9 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import javax.management.JMException;
 import javax.management.MBeanServer;
 import javax.management.MBeanServerFactory;
@@ -58,6 +60,7 @@
     public static final String DEFAULT_SERVICE_URL_PATH = "/jmxrmi/camel";
     private static final transient Log LOG = LogFactory.getLog(DefaultInstrumentationAgent.class);
 
+    private ExecutorService executorService;
     private MBeanServer server;
     private Set<ObjectName> mbeans = new HashSet<ObjectName>();
     private MetadataMBeanInfoAssembler assembler;
@@ -107,34 +110,74 @@
         registryPort = value;
     }
 
+    public Integer getRegistryPort() {
+        return registryPort;
+    }
+
     public void setConnectorPort(Integer value) {
         connectorPort = value;
     }
 
+    public Integer getConnectorPort() {
+        return connectorPort;
+    }
+
     public void setMBeanServerDefaultDomain(String value) {
         mBeanServerDefaultDomain = value;
     }
 
+    public String getMBeanServerDefaultDomain() {
+        return mBeanServerDefaultDomain;
+    }
+
     public void setMBeanObjectDomainName(String value) {
         mBeanObjectDomainName = value;
     }
 
+    public String getMBeanObjectDomainName() {
+        return mBeanObjectDomainName;
+    }
+
     public void setServiceUrlPath(String value) {
         serviceUrlPath = value;
     }
 
+    public String getServiceUrlPath() {
+        return serviceUrlPath;
+    }
+
     public void setCreateConnector(Boolean flag) {
         createConnector = flag;
     }
 
+    public Boolean getCreateConnector() {
+        return createConnector;
+    }
+
     public void setUsePlatformMBeanServer(Boolean flag) {
         usePlatformMBeanServer = flag;
     }
 
+    public Boolean getUsePlatformMBeanServer() {
+        return usePlatformMBeanServer;
+    }
+
+    public void setServer(MBeanServer value) {
+        server = value;
+    }
+
     public MBeanServer getMBeanServer() {
         return server;
     }
 
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
     public void register(Object obj, ObjectName name) throws JMException {
         register(obj, name, false);
     }
@@ -238,7 +281,7 @@
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Registered MBean with objectname: " + registeredName);
             }
-            
+
             mbeans.add(registeredName);
         }
     }
@@ -295,7 +338,7 @@
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Found MBeanServer with default domain " + server.getDefaultDomain());
             }
-            
+
             if (mBeanServerDefaultDomain.equals(server.getDefaultDomain())) {
                 return server;
             }
@@ -316,8 +359,7 @@
         }
 
         // Create an RMI connector and start it
-        JMXServiceURL url;
-
+        final JMXServiceURL url;
         if (connectorPort > 0) {
             url = new JMXServiceURL("service:jmx:rmi://" + host + ":" + connectorPort + "/jndi/rmi://" + host
                                     + ":" + registryPort + serviceUrlPath);
@@ -325,10 +367,22 @@
             url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + registryPort
                                     + serviceUrlPath);
         }
+
         cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, server);
 
-        // Start the connector server asynchronously (in a separate thread).
-        Thread connectorThread = new Thread() {
+        if (executorService == null) {
+            // we only need a single for the JMX connector
+            executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
+                public Thread newThread(Runnable runnable) {
+                    Thread thread = new Thread(runnable, getThreadName("Camel JMXConnector: " + url));
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
+        }
+
+        // execute the JMX connector
+        executorService.execute(new Runnable() {
             public void run() {
                 try {
                     cs.start();
@@ -336,18 +390,9 @@
                     LOG.warn("Could not start JMXConnector thread.", ioe);
                 }
             }
-        };
-        connectorThread.setName("Camel JMX Connector Thread [" + url + "]");
-        connectorThread.start();
-        LOG.info("JMX Connector thread started and listening at: " + url);
-    }
-
-    public String getMBeanObjectDomainName() {
-        return mBeanObjectDomainName;
-    }
+        });
 
-    public void setServer(MBeanServer value) {
-        server = value;
+        LOG.info("JMX Connector thread started and listening at: " + url);
     }
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=755541&r1=755540&r2=755541&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Wed Mar 18 10:47:49 2009
@@ -48,14 +48,15 @@
 public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor {
     private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
 
-    private static Timer timer = new Timer();
-    private final Processor output;
+    // we can use a single shared static timer for async redeliveries
+    private static final Timer REDELIVER_TIMER = new Timer("Camel DeadLetterChannel Redeliver Timer", true);
     private final Processor deadLetter;
     private final String deadLetterUri;
+    private final Processor output;
     private final AsyncProcessor outputAsync;
+    private final Processor redeliveryProcessor;
     private RedeliveryPolicy redeliveryPolicy;
     private Logger logger;
-    private final Processor redeliveryProcessor;
 
     private class RedeliveryData {
         int redeliveryCounter;
@@ -94,7 +95,7 @@
                     // only process if the exchange hasn't failed
                     // and it has not been handled by the error processor
                     if (exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) {
-                        // if we are redelivering then sleep before trying again
+                        // deliver to async to process it
                         asyncProcess(exchange, callback, data);
                     } else {
                         callback.done(sync);
@@ -275,9 +276,9 @@
             if (exchange.getException() != null) {
                 exchange.setException(null);
             }
-            // wait until we should redeliver
+            // wait until we should redeliver using a timer to avoid thread blocking
             data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
-            timer.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
+            REDELIVER_TIMER.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
 
             // letting onRedeliver be executed
             deliverToRedeliveryProcessor(exchange, callback, data);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=755541&r1=755540&r2=755541&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java Wed Mar 18 10:47:49 2009
@@ -99,7 +99,7 @@
     }
 
     public void start() {
-        timer = new Timer("Stream Resequencer Timer");
+        timer = new Timer("Camel Stream Resequencer Timer", true);
     }
 
     /**

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=755541&r1=755540&r2=755541&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java Wed Mar 18 10:47:49 2009
@@ -92,7 +92,8 @@
                 Integer counter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
                 int attempt = (counter == null) ? 1 : counter + 1;
                 if (attempt > 1) {
-                    assertEquals("Now we should use TimerThread to call the process", Thread.currentThread().getName(), "Timer-0");
+                    assertEquals("Now we should use TimerThread to call the process", Thread.currentThread().getName(),
+                            "Camel DeadLetterChannel Redeliver Timer");
                 }
                 
                 if (attempt < failUntilAttempt) {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/NormalizerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/NormalizerTest.java?rev=755541&r1=755540&r2=755541&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/NormalizerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/NormalizerTest.java Wed Mar 18 10:47:49 2009
@@ -60,11 +60,11 @@
                 // START SNIPPET: example                
                 // we need to normalize two types of incoming messages
                 from("direct:start")
-                  .choice()
-                    .when().xpath("/employee").to("bean:normalizer?method=employeeToPerson")
-                    .when().xpath("/customer").to("bean:normalizer?method=customerToPerson")
-                  .end()
-                  .to("mock:result");               
+                    .choice()
+                        .when().xpath("/employee").to("bean:normalizer?method=employeeToPerson")
+                        .when().xpath("/customer").to("bean:normalizer?method=customerToPerson")
+                    .end()
+                    .to("mock:result");
                 // END SNIPPET: example
             }
         };