You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/03/18 11:47:50 UTC
svn commit: r755541 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/management/ main/java/org/apache/camel/processor/
main/java/org/apache/camel/processor/resequencer/
test/java/org/apache/camel/processor/
Author: davsclaus
Date: Wed Mar 18 10:47:49 2009
New Revision: 755541
URL: http://svn.apache.org/viewvc?rev=755541&view=rev
Log:
Using ExecutorService for JMXConnector thread. Background threads is now daemon.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/NormalizerTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java?rev=755541&r1=755540&r2=755541&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultInstrumentationAgent.java Wed Mar 18 10:47:49 2009
@@ -25,7 +25,9 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
@@ -58,6 +60,7 @@
public static final String DEFAULT_SERVICE_URL_PATH = "/jmxrmi/camel";
private static final transient Log LOG = LogFactory.getLog(DefaultInstrumentationAgent.class);
+ private ExecutorService executorService;
private MBeanServer server;
private Set<ObjectName> mbeans = new HashSet<ObjectName>();
private MetadataMBeanInfoAssembler assembler;
@@ -107,34 +110,74 @@
registryPort = value;
}
+ public Integer getRegistryPort() {
+ return registryPort;
+ }
+
public void setConnectorPort(Integer value) {
connectorPort = value;
}
+ public Integer getConnectorPort() {
+ return connectorPort;
+ }
+
public void setMBeanServerDefaultDomain(String value) {
mBeanServerDefaultDomain = value;
}
+ public String getMBeanServerDefaultDomain() {
+ return mBeanServerDefaultDomain;
+ }
+
public void setMBeanObjectDomainName(String value) {
mBeanObjectDomainName = value;
}
+ public String getMBeanObjectDomainName() {
+ return mBeanObjectDomainName;
+ }
+
public void setServiceUrlPath(String value) {
serviceUrlPath = value;
}
+ public String getServiceUrlPath() {
+ return serviceUrlPath;
+ }
+
public void setCreateConnector(Boolean flag) {
createConnector = flag;
}
+ public Boolean getCreateConnector() {
+ return createConnector;
+ }
+
public void setUsePlatformMBeanServer(Boolean flag) {
usePlatformMBeanServer = flag;
}
+ public Boolean getUsePlatformMBeanServer() {
+ return usePlatformMBeanServer;
+ }
+
+ public void setServer(MBeanServer value) {
+ server = value;
+ }
+
public MBeanServer getMBeanServer() {
return server;
}
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
public void register(Object obj, ObjectName name) throws JMException {
register(obj, name, false);
}
@@ -238,7 +281,7 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Registered MBean with objectname: " + registeredName);
}
-
+
mbeans.add(registeredName);
}
}
@@ -295,7 +338,7 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Found MBeanServer with default domain " + server.getDefaultDomain());
}
-
+
if (mBeanServerDefaultDomain.equals(server.getDefaultDomain())) {
return server;
}
@@ -316,8 +359,7 @@
}
// Create an RMI connector and start it
- JMXServiceURL url;
-
+ final JMXServiceURL url;
if (connectorPort > 0) {
url = new JMXServiceURL("service:jmx:rmi://" + host + ":" + connectorPort + "/jndi/rmi://" + host
+ ":" + registryPort + serviceUrlPath);
@@ -325,10 +367,22 @@
url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + registryPort
+ serviceUrlPath);
}
+
cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, server);
- // Start the connector server asynchronously (in a separate thread).
- Thread connectorThread = new Thread() {
+ if (executorService == null) {
+ // we only need a single for the JMX connector
+ executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, getThreadName("Camel JMXConnector: " + url));
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ }
+
+ // execute the JMX connector
+ executorService.execute(new Runnable() {
public void run() {
try {
cs.start();
@@ -336,18 +390,9 @@
LOG.warn("Could not start JMXConnector thread.", ioe);
}
}
- };
- connectorThread.setName("Camel JMX Connector Thread [" + url + "]");
- connectorThread.start();
- LOG.info("JMX Connector thread started and listening at: " + url);
- }
-
- public String getMBeanObjectDomainName() {
- return mBeanObjectDomainName;
- }
+ });
- public void setServer(MBeanServer value) {
- server = value;
+ LOG.info("JMX Connector thread started and listening at: " + url);
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=755541&r1=755540&r2=755541&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Wed Mar 18 10:47:49 2009
@@ -48,14 +48,15 @@
public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor {
private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
- private static Timer timer = new Timer();
- private final Processor output;
+ // we can use a single shared static timer for async redeliveries
+ private static final Timer REDELIVER_TIMER = new Timer("Camel DeadLetterChannel Redeliver Timer", true);
private final Processor deadLetter;
private final String deadLetterUri;
+ private final Processor output;
private final AsyncProcessor outputAsync;
+ private final Processor redeliveryProcessor;
private RedeliveryPolicy redeliveryPolicy;
private Logger logger;
- private final Processor redeliveryProcessor;
private class RedeliveryData {
int redeliveryCounter;
@@ -94,7 +95,7 @@
// only process if the exchange hasn't failed
// and it has not been handled by the error processor
if (exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) {
- // if we are redelivering then sleep before trying again
+ // deliver to async to process it
asyncProcess(exchange, callback, data);
} else {
callback.done(sync);
@@ -275,9 +276,9 @@
if (exchange.getException() != null) {
exchange.setException(null);
}
- // wait until we should redeliver
+ // wait until we should redeliver using a timer to avoid thread blocking
data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
- timer.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
+ REDELIVER_TIMER.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
// letting onRedeliver be executed
deliverToRedeliveryProcessor(exchange, callback, data);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=755541&r1=755540&r2=755541&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java Wed Mar 18 10:47:49 2009
@@ -99,7 +99,7 @@
}
public void start() {
- timer = new Timer("Stream Resequencer Timer");
+ timer = new Timer("Camel Stream Resequencer Timer", true);
}
/**
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=755541&r1=755540&r2=755541&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java Wed Mar 18 10:47:49 2009
@@ -92,7 +92,8 @@
Integer counter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
int attempt = (counter == null) ? 1 : counter + 1;
if (attempt > 1) {
- assertEquals("Now we should use TimerThread to call the process", Thread.currentThread().getName(), "Timer-0");
+ assertEquals("Now we should use TimerThread to call the process", Thread.currentThread().getName(),
+ "Camel DeadLetterChannel Redeliver Timer");
}
if (attempt < failUntilAttempt) {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/NormalizerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/NormalizerTest.java?rev=755541&r1=755540&r2=755541&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/NormalizerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/NormalizerTest.java Wed Mar 18 10:47:49 2009
@@ -60,11 +60,11 @@
// START SNIPPET: example
// we need to normalize two types of incoming messages
from("direct:start")
- .choice()
- .when().xpath("/employee").to("bean:normalizer?method=employeeToPerson")
- .when().xpath("/customer").to("bean:normalizer?method=customerToPerson")
- .end()
- .to("mock:result");
+ .choice()
+ .when().xpath("/employee").to("bean:normalizer?method=employeeToPerson")
+ .when().xpath("/customer").to("bean:normalizer?method=customerToPerson")
+ .end()
+ .to("mock:result");
// END SNIPPET: example
}
};