You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/11/13 09:50:09 UTC
svn commit: r1408638 - in
/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms:
./ taskmanager/ tx/
Author: davsclaus
Date: Tue Nov 13 08:50:08 2012
New Revision: 1408638
URL: http://svn.apache.org/viewvc?rev=1408638&view=rev
Log:
Removed evil singleton. Polished code.
Removed:
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MissingHeaderException.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManagerFactory.java
Modified:
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java?rev=1408638&r1=1408637&r2=1408638&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java Tue Nov 13 08:50:08 2012
@@ -17,7 +17,6 @@
package org.apache.camel.component.sjms;
import java.util.Map;
-
import javax.jms.ConnectionFactory;
import org.apache.camel.CamelException;
@@ -26,7 +25,7 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.component.sjms.jms.ConnectionFactoryResource;
import org.apache.camel.component.sjms.jms.ConnectionResource;
import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
-import org.apache.camel.component.sjms.taskmanager.TimedTaskManagerFactory;
+import org.apache.camel.component.sjms.taskmanager.TimedTaskManager;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.HeaderFilterStrategyAware;
@@ -35,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Represents the component that manages {@link SimpleJmsEndpoint}.
+ * The <a href="http://camel.apache.org/sjms">Simple JMS</a> component.
*/
public class SjmsComponent extends DefaultComponent implements HeaderFilterStrategyAware {
private static final transient Logger LOGGER = LoggerFactory.getLogger(SjmsComponent.class);
@@ -46,17 +45,8 @@ public class SjmsComponent extends Defau
private KeyFormatStrategy keyFormatStrategy;
private Integer connectionCount = 1;
private TransactionCommitStrategy transactionCommitStrategy;
+ private TimedTaskManager timedTaskManager;
- /**
- * @see
- * org.apache.camel.impl.DefaultComponent#createEndpoint(java.lang.String,
- * java.lang.String, java.util.Map)
- * @param uri The value passed into our call to create an endpoint
- * @param remaining
- * @param parameters
- * @return
- * @throws Exception
- */
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
validateMepAndReplyTo(parameters);
@@ -80,7 +70,7 @@ public class SjmsComponent extends Defau
* @return String
* @throws Exception
*/
- private String normalizeUri(String uri) throws Exception {
+ private static String normalizeUri(String uri) throws Exception {
String tempUri = uri;
String endpointName = tempUri.substring(0, tempUri.indexOf(":"));
tempUri = tempUri.substring(endpointName.length());
@@ -93,7 +83,7 @@ public class SjmsComponent extends Defau
}
if (ObjectHelper.isEmpty(protocol)) {
protocol = "queue";
- } else if (protocol.equals("queue") || protocol.equals("topic")) {
+ } else if (protocol != null && (protocol.equals("queue") || protocol.equals("topic"))) {
tempUri = tempUri.substring(protocol.length() + 1);
} else {
throw new Exception("Unsupported Protocol: " + protocol);
@@ -114,14 +104,14 @@ public class SjmsComponent extends Defau
* @throws Exception throws a {@link CamelException} when MEP equals InOnly
* and namedReplyTo is defined.
*/
- private void validateMepAndReplyTo(Map<String, Object> parameters) throws Exception {
+ private static void validateMepAndReplyTo(Map<String, Object> parameters) throws Exception {
boolean namedReplyToSet = parameters.containsKey("namedReplyTo");
boolean mepSet = parameters.containsKey("exchangePattern");
if (namedReplyToSet && mepSet) {
if (!parameters.get("exchangePattern").equals(ExchangePattern.InOut.toString())) {
String namedReplyTo = (String)parameters.get("namedReplyTo");
ExchangePattern mep = ExchangePattern.valueOf((String)parameters.get("exchangePattern"));
- throw new CamelException("Setting parameter namedReplyTo=" + namedReplyTo + " requires a MEP of type InOut. Parameter exchangePattern is set to " + mep);
+ throw new CamelException("Setting parameter namedReplyTo=" + namedReplyTo + " requires a MEP of type InOut. Parameter exchangePattern is set to " + mep);
}
}
}
@@ -130,9 +120,11 @@ public class SjmsComponent extends Defau
protected void doStart() throws Exception {
super.doStart();
- LOGGER.debug("Verify ConnectionResource");
+ timedTaskManager = new TimedTaskManager();
+
+ LOGGER.trace("Verify ConnectionResource");
if (getConnectionResource() == null) {
- LOGGER.debug("No ConnectionResource provided. Initialize the ConnectionFactoryResource.");
+ LOGGER.debug("No ConnectionResource provided. Initialize the ConnectionFactoryResource.");
// We always use a connection pool, even for a pool of 1
ConnectionFactoryResource connections = new ConnectionFactoryResource(getConnectionCount(), getConnectionFactory());
connections.fillPool();
@@ -144,7 +136,8 @@ public class SjmsComponent extends Defau
@Override
protected void doStop() throws Exception {
- TimedTaskManagerFactory.getInstance().cancelTasks();
+ timedTaskManager.cancelTasks();
+
if (getConnectionResource() != null) {
if (getConnectionResource() instanceof ConnectionFactoryResource) {
((ConnectionFactoryResource)getConnectionResource()).drainPool();
@@ -156,9 +149,6 @@ public class SjmsComponent extends Defau
/**
* Sets the ConnectionFactory value of connectionFactory for this instance
* of SjmsComponent.
- *
- * @param connectionFactory Sets ConnectionFactory, default is TODO add
- * default
*/
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
@@ -221,11 +211,16 @@ public class SjmsComponent extends Defau
/**
* Sets the TransactionCommitStrategy value of transactionCommitStrategy for this
* instance of SjmsComponent.
- *
- * @param transactionCommitStrategy Sets TransactionCommitStrategy, default is TODO add
- * default
*/
public void setTransactionCommitStrategy(TransactionCommitStrategy commitStrategy) {
this.transactionCommitStrategy = commitStrategy;
}
+
+ public TimedTaskManager getTimedTaskManager() {
+ return timedTaskManager;
+ }
+
+ public void setTimedTaskManager(TimedTaskManager timedTaskManager) {
+ this.timedTaskManager = timedTaskManager;
+ }
}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java?rev=1408638&r1=1408637&r2=1408638&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java Tue Nov 13 08:50:08 2012
@@ -33,6 +33,7 @@ import org.apache.camel.component.sjms.j
import org.apache.camel.component.sjms.jms.JmsObjectFactory;
import org.apache.camel.component.sjms.jms.ObjectPool;
import org.apache.camel.component.sjms.jms.SessionPool;
+import org.apache.camel.component.sjms.taskmanager.TimedTaskManager;
import org.apache.camel.component.sjms.tx.BatchTransactionCommitStrategy;
import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
import org.apache.camel.component.sjms.tx.SessionBatchTransactionSynchronization;
@@ -62,9 +63,6 @@ public class SjmsConsumer extends Defaul
* Creates a new MessageConsumerResources instance.
*
* @see org.apache.camel.component.sjms.jms.ObjectPool#createObject()
- *
- * @return
- * @throws Exception
*/
@Override
protected MessageConsumerResources createObject() throws Exception {
@@ -81,9 +79,6 @@ public class SjmsConsumer extends Defaul
* Cleans up the MessageConsumerResources.
*
* @see org.apache.camel.component.sjms.jms.ObjectPool#destroyObject(java.lang.Object)
- *
- * @param model
- * @throws Exception
*/
@Override
protected void destroyObject(MessageConsumerResources model) throws Exception {
@@ -112,19 +107,12 @@ public class SjmsConsumer extends Defaul
private final Session session;
private final MessageConsumer messageConsumer;
- /**
- * @param messageProducer
- */
public MessageConsumerResources(MessageConsumer messageConsumer) {
super();
this.session = null;
this.messageConsumer = messageConsumer;
}
- /**
- * @param session
- * @param messageProducer
- */
public MessageConsumerResources(Session session, MessageConsumer messageConsumer) {
super();
this.session = session;
@@ -158,6 +146,11 @@ public class SjmsConsumer extends Defaul
}
@Override
+ public SjmsEndpoint getEndpoint() {
+ return (SjmsEndpoint) super.getEndpoint();
+ }
+
+ @Override
protected void doStart() throws Exception {
super.doStart();
consumers = new MessageConsumerPool();
@@ -173,24 +166,9 @@ public class SjmsConsumer extends Defaul
}
}
- @Override
- protected void doResume() throws Exception {
- super.doResume();
- doStart();
- }
-
- @Override
- protected void doSuspend() throws Exception {
- doStop();
- super.doSuspend();
- }
-
/**
* Creates a {@link MessageConsumerResources} with a dedicated
* {@link Session} required for transacted and InOut consumers.
- *
- * @return MessageConsumerResources
- * @throws Exception
*/
private MessageConsumerResources createConsumerWithDedicatedSession() throws Exception {
Connection conn = getConnectionResource().borrowConnection();
@@ -210,9 +188,6 @@ public class SjmsConsumer extends Defaul
/**
* Creates a {@link MessageConsumerResources} with a shared {@link Session}
* for non-transacted InOnly consumers.
- *
- * @return
- * @throws Exception
*/
private MessageConsumerResources createConsumerListener() throws Exception {
Session queueSession = getSessionPool().borrowObject();
@@ -233,7 +208,7 @@ public class SjmsConsumer extends Defaul
* Helper factory method used to create a MessageListener based on the MEP
*
* @param session a session is only required if we are a transacted consumer
- * @return
+ * @return the listener
*/
protected MessageListener createMessageHandler(Session session) {
@@ -245,15 +220,16 @@ public class SjmsConsumer extends Defaul
} else {
commitStrategy = new DefaultTransactionCommitStrategy();
}
-
- Synchronization synchronization = null;
+
+ Synchronization synchronization;
if (commitStrategy instanceof BatchTransactionCommitStrategy) {
- synchronization = new SessionBatchTransactionSynchronization(session, commitStrategy, getTransactionBatchTimeout());
+ TimedTaskManager timedTaskManager = getEndpoint().getComponent().getTimedTaskManager();
+ synchronization = new SessionBatchTransactionSynchronization(timedTaskManager, session, commitStrategy, getTransactionBatchTimeout());
} else {
synchronization = new SessionTransactionSynchronization(session, commitStrategy);
}
- AbstractMessageHandler messageHandler = null;
+ AbstractMessageHandler messageHandler;
if (getSjmsEndpoint().getExchangePattern().equals(ExchangePattern.InOnly)) {
if (isTransacted()) {
messageHandler = new InOnlyMessageHandler(getEndpoint(), executor, synchronization);
@@ -338,9 +314,7 @@ public class SjmsConsumer extends Defaul
}
/**
- * Sets the JMS Message selector syntax.
- *
- * @param messageSelector Message selector syntax or null
+ * Gets the JMS Message selector syntax.
*/
public String getMessageSelector() {
return getSjmsEndpoint().getMessageSelector();
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java?rev=1408638&r1=1408637&r2=1408638&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java Tue Nov 13 08:50:08 2012
@@ -69,11 +69,16 @@ public class SjmsEndpoint extends Defaul
} else if (getEndpointUri().indexOf("://topic:") > -1) {
topic = true;
} else {
- throw new RuntimeCamelException("Endpoint URI unsupported: " + uri);
+ throw new IllegalArgumentException("Endpoint URI unsupported: " + uri);
}
}
@Override
+ public SjmsComponent getComponent() {
+ return (SjmsComponent) super.getComponent();
+ }
+
+ @Override
protected void doStart() throws Exception {
super.doStart();
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java?rev=1408638&r1=1408637&r2=1408638&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java Tue Nov 13 08:50:08 2012
@@ -28,9 +28,9 @@ import java.util.concurrent.locks.Reentr
public class TimedTaskManager {
private final Timer timer = new Timer();
- private ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
- TimedTaskManager() {
+ public TimedTaskManager() {
}
public void addTask(TimerTask task, long delay) {
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java?rev=1408638&r1=1408637&r2=1408638&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java Tue Nov 13 08:50:08 2012
@@ -24,28 +24,27 @@ import javax.jms.Session;
import org.apache.camel.Exchange;
import org.apache.camel.component.sjms.TransactionCommitStrategy;
-import org.apache.camel.component.sjms.taskmanager.TimedTaskManagerFactory;
+import org.apache.camel.component.sjms.taskmanager.TimedTaskManager;
import org.apache.camel.spi.Synchronization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SessionTransactionSynchronization is called at the completion of each
- * {@link org.apache.camel.Exhcnage}.
+ * {@link org.apache.camel.Exchange}.
*/
public class SessionBatchTransactionSynchronization implements Synchronization {
- private Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger LOG = LoggerFactory.getLogger(SessionBatchTransactionSynchronization.class);
private Session session;
private final TransactionCommitStrategy commitStrategy;
private long batchTransactionTimeout = 5000;
private TimeoutTask currentTask;
private ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final TimedTaskManager timedTaskManager;
- public SessionBatchTransactionSynchronization(Session session, TransactionCommitStrategy commitStrategy) {
- this(session, commitStrategy, 5000);
- }
-
- public SessionBatchTransactionSynchronization(Session session, TransactionCommitStrategy commitStrategy, long batchTransactionTimeout) {
+ public SessionBatchTransactionSynchronization(TimedTaskManager timedTaskManager,
+ Session session, TransactionCommitStrategy commitStrategy, long batchTransactionTimeout) {
+ this.timedTaskManager = timedTaskManager;
this.session = session;
if (commitStrategy == null) {
this.commitStrategy = new DefaultTransactionCommitStrategy();
@@ -58,43 +57,39 @@ public class SessionBatchTransactionSync
}
}
- /**
- * @see org.apache.camel.spi.Synchronization#onFailure(org.apache.camel.Exchange)
- * @param exchange
- */
@Override
public void onFailure(Exchange exchange) {
try {
lock.readLock().lock();
if (commitStrategy.rollback(exchange)) {
- log.debug("Processing failure of Exchange id:{}", exchange.getExchangeId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing failure of Exchange id:{}", exchange.getExchangeId());
+ }
if (session != null && session.getTransacted()) {
session.rollback();
}
}
} catch (Exception e) {
- log.warn("Failed to rollback the session: {}", e.getMessage());
+ LOG.warn("Failed to rollback the session: " + e.getMessage() + ". This exception will be ignored.", e);
} finally {
lock.readLock().unlock();
}
}
- /**
- * @see org.apache.camel.spi.Synchronization#onComplete(org.apache.camel.Exchange)
- * @param exchange
- */
@Override
public void onComplete(Exchange exchange) {
try {
lock.readLock().lock();
if (commitStrategy.commit(exchange)) {
- log.debug("Processing completion of Exchange id:{}", exchange.getExchangeId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing completion of Exchange id:{}", exchange.getExchangeId());
+ }
if (session != null && session.getTransacted()) {
session.commit();
}
}
} catch (Exception e) {
- log.warn("Failed to commit the session: {}", e.getMessage());
+ LOG.warn("Failed to commit the session: " + e.getMessage() + ". This exception will be ignored.", e);
exchange.setException(e);
} finally {
lock.readLock().unlock();
@@ -119,17 +114,12 @@ public class SessionBatchTransactionSync
} finally {
lock.writeLock().unlock();
}
- TimedTaskManagerFactory.getInstance().addTask(currentTask, batchTransactionTimeout);
+ timedTaskManager.addTask(currentTask, batchTransactionTimeout);
}
- public class TimeoutTask extends TimerTask {
+ public final class TimeoutTask extends TimerTask {
- /**
- * Default constructor
- *
- * @param str
- */
- TimeoutTask() {
+ private TimeoutTask() {
}
/**
@@ -137,17 +127,17 @@ public class SessionBatchTransactionSync
* transaction.
*/
public void run() {
- log.info("Batch Transaction Timer expired:");
+ LOG.debug("Batch Transaction Timer expired");
try {
lock.writeLock().lock();
- log.debug("Committing the current transactions");
+ LOG.trace("Committing the current transactions");
try {
if (session != null && session.getTransacted()) {
session.commit();
}
((BatchTransactionCommitStrategy)commitStrategy).reset();
} catch (Exception e) {
- log.warn("Failed to commit the session during timeout: {}", e.getMessage());
+ LOG.warn("Failed to commit the session during timeout: " + e.getMessage() + ". This exception will be ignored.", e);
}
} finally {
lock.writeLock().unlock();
@@ -156,9 +146,7 @@ public class SessionBatchTransactionSync
@Override
public boolean cancel() {
- if (log.isTraceEnabled()) {
- log.trace("Cancelling the TimeoutTask");
- }
+ LOG.trace("Cancelling the TimeoutTask");
return super.cancel();
}
}