You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/11/13 09:50:09 UTC

svn commit: r1408638 - in /camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms: ./ taskmanager/ tx/

Author: davsclaus
Date: Tue Nov 13 08:50:08 2012
New Revision: 1408638

URL: http://svn.apache.org/viewvc?rev=1408638&view=rev
Log:
Removed evil singleton. Polished code.

Removed:
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MissingHeaderException.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManagerFactory.java
Modified:
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java?rev=1408638&r1=1408637&r2=1408638&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java Tue Nov 13 08:50:08 2012
@@ -17,7 +17,6 @@
 package org.apache.camel.component.sjms;
 
 import java.util.Map;
-
 import javax.jms.ConnectionFactory;
 
 import org.apache.camel.CamelException;
@@ -26,7 +25,7 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.sjms.jms.ConnectionFactoryResource;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
 import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
-import org.apache.camel.component.sjms.taskmanager.TimedTaskManagerFactory;
+import org.apache.camel.component.sjms.taskmanager.TimedTaskManager;
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
@@ -35,7 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Represents the component that manages {@link SimpleJmsEndpoint}.
+ * The <a href="http://camel.apache.org/sjms">Simple JMS</a> component.
  */
 public class SjmsComponent extends DefaultComponent implements HeaderFilterStrategyAware {
     private static final transient Logger LOGGER = LoggerFactory.getLogger(SjmsComponent.class);
@@ -46,17 +45,8 @@ public class SjmsComponent extends Defau
     private KeyFormatStrategy keyFormatStrategy;
     private Integer connectionCount = 1;
     private TransactionCommitStrategy transactionCommitStrategy;
+    private TimedTaskManager timedTaskManager;
 
-    /**
-     * @see
-     * org.apache.camel.impl.DefaultComponent#createEndpoint(java.lang.String,
-     * java.lang.String, java.util.Map)
-     * @param uri The value passed into our call to create an endpoint
-     * @param remaining
-     * @param parameters
-     * @return
-     * @throws Exception
-     */
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         validateMepAndReplyTo(parameters);
@@ -80,7 +70,7 @@ public class SjmsComponent extends Defau
      * @return String
      * @throws Exception
      */
-    private String normalizeUri(String uri) throws Exception {
+    private static String normalizeUri(String uri) throws Exception {
         String tempUri = uri;
         String endpointName = tempUri.substring(0, tempUri.indexOf(":"));
         tempUri = tempUri.substring(endpointName.length());
@@ -93,7 +83,7 @@ public class SjmsComponent extends Defau
         }
         if (ObjectHelper.isEmpty(protocol)) {
             protocol = "queue";
-        } else if (protocol.equals("queue") || protocol.equals("topic")) {
+        } else if (protocol != null && (protocol.equals("queue") || protocol.equals("topic"))) {
             tempUri = tempUri.substring(protocol.length() + 1);
         } else {
             throw new Exception("Unsupported Protocol: " + protocol);
@@ -114,14 +104,14 @@ public class SjmsComponent extends Defau
      * @throws Exception throws a {@link CamelException} when MEP equals InOnly
      *             and namedReplyTo is defined.
      */
-    private void validateMepAndReplyTo(Map<String, Object> parameters) throws Exception {
+    private static void validateMepAndReplyTo(Map<String, Object> parameters) throws Exception {
         boolean namedReplyToSet = parameters.containsKey("namedReplyTo");
         boolean mepSet = parameters.containsKey("exchangePattern");
         if (namedReplyToSet && mepSet) {
             if (!parameters.get("exchangePattern").equals(ExchangePattern.InOut.toString())) {
                 String namedReplyTo = (String)parameters.get("namedReplyTo");
                 ExchangePattern mep = ExchangePattern.valueOf((String)parameters.get("exchangePattern"));
-                throw new CamelException("Setting parameter namedReplyTo=" + namedReplyTo + " requires a MEP of type InOut.  Parameter exchangePattern is set to " + mep);
+                throw new CamelException("Setting parameter namedReplyTo=" + namedReplyTo + " requires a MEP of type InOut. Parameter exchangePattern is set to " + mep);
             }
         }
     }
@@ -130,9 +120,11 @@ public class SjmsComponent extends Defau
     protected void doStart() throws Exception {
         super.doStart();
 
-        LOGGER.debug("Verify ConnectionResource");
+        timedTaskManager = new TimedTaskManager();
+
+        LOGGER.trace("Verify ConnectionResource");
         if (getConnectionResource() == null) {
-            LOGGER.debug("No ConnectionResource provided.  Initialize the ConnectionFactoryResource.");
+            LOGGER.debug("No ConnectionResource provided. Initialize the ConnectionFactoryResource.");
             // We always use a connection pool, even for a pool of 1
             ConnectionFactoryResource connections = new ConnectionFactoryResource(getConnectionCount(), getConnectionFactory());
             connections.fillPool();
@@ -144,7 +136,8 @@ public class SjmsComponent extends Defau
 
     @Override
     protected void doStop() throws Exception {
-        TimedTaskManagerFactory.getInstance().cancelTasks();
+        timedTaskManager.cancelTasks();
+
         if (getConnectionResource() != null) {
             if (getConnectionResource() instanceof ConnectionFactoryResource) {
                 ((ConnectionFactoryResource)getConnectionResource()).drainPool();
@@ -156,9 +149,6 @@ public class SjmsComponent extends Defau
     /**
      * Sets the ConnectionFactory value of connectionFactory for this instance
      * of SjmsComponent.
-     * 
-     * @param connectionFactory Sets ConnectionFactory, default is TODO add
-     *            default
      */
     public void setConnectionFactory(ConnectionFactory connectionFactory) {
         this.connectionFactory = connectionFactory;
@@ -221,11 +211,16 @@ public class SjmsComponent extends Defau
     /**
      * Sets the TransactionCommitStrategy value of transactionCommitStrategy for this
      * instance of SjmsComponent.
-     * 
-     * @param transactionCommitStrategy Sets TransactionCommitStrategy, default is TODO add
-     *            default
      */
     public void setTransactionCommitStrategy(TransactionCommitStrategy commitStrategy) {
         this.transactionCommitStrategy = commitStrategy;
     }
+
+    public TimedTaskManager getTimedTaskManager() {
+        return timedTaskManager;
+    }
+
+    public void setTimedTaskManager(TimedTaskManager timedTaskManager) {
+        this.timedTaskManager = timedTaskManager;
+    }
 }

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java?rev=1408638&r1=1408637&r2=1408638&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java Tue Nov 13 08:50:08 2012
@@ -33,6 +33,7 @@ import org.apache.camel.component.sjms.j
 import org.apache.camel.component.sjms.jms.JmsObjectFactory;
 import org.apache.camel.component.sjms.jms.ObjectPool;
 import org.apache.camel.component.sjms.jms.SessionPool;
+import org.apache.camel.component.sjms.taskmanager.TimedTaskManager;
 import org.apache.camel.component.sjms.tx.BatchTransactionCommitStrategy;
 import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
 import org.apache.camel.component.sjms.tx.SessionBatchTransactionSynchronization;
@@ -62,9 +63,6 @@ public class SjmsConsumer extends Defaul
          * Creates a new MessageConsumerResources instance.
          *
          * @see org.apache.camel.component.sjms.jms.ObjectPool#createObject()
-         *
-         * @return
-         * @throws Exception
          */
         @Override
         protected MessageConsumerResources createObject() throws Exception {
@@ -81,9 +79,6 @@ public class SjmsConsumer extends Defaul
          * Cleans up the MessageConsumerResources.
          *
          * @see org.apache.camel.component.sjms.jms.ObjectPool#destroyObject(java.lang.Object)
-         *
-         * @param model
-         * @throws Exception
          */
         @Override
         protected void destroyObject(MessageConsumerResources model) throws Exception {
@@ -112,19 +107,12 @@ public class SjmsConsumer extends Defaul
         private final Session session;
         private final MessageConsumer messageConsumer;
 
-        /**
-         * @param messageProducer
-         */
         public MessageConsumerResources(MessageConsumer messageConsumer) {
             super();
             this.session = null;
             this.messageConsumer = messageConsumer;
         }
 
-        /**
-         * @param session
-         * @param messageProducer
-         */
         public MessageConsumerResources(Session session, MessageConsumer messageConsumer) {
             super();
             this.session = session;
@@ -158,6 +146,11 @@ public class SjmsConsumer extends Defaul
     }
 
     @Override
+    public SjmsEndpoint getEndpoint() {
+        return (SjmsEndpoint) super.getEndpoint();
+    }
+
+    @Override
     protected void doStart() throws Exception {
         super.doStart();
         consumers = new MessageConsumerPool();
@@ -173,24 +166,9 @@ public class SjmsConsumer extends Defaul
         }
     }
 
-    @Override
-    protected void doResume() throws Exception {
-        super.doResume();
-        doStart();
-    }
-
-    @Override
-    protected void doSuspend() throws Exception {
-        doStop();
-        super.doSuspend();
-    }
-
     /**
      * Creates a {@link MessageConsumerResources} with a dedicated
      * {@link Session} required for transacted and InOut consumers.
-     * 
-     * @return MessageConsumerResources
-     * @throws Exception
      */
     private MessageConsumerResources createConsumerWithDedicatedSession() throws Exception {
         Connection conn = getConnectionResource().borrowConnection();
@@ -210,9 +188,6 @@ public class SjmsConsumer extends Defaul
     /**
      * Creates a {@link MessageConsumerResources} with a shared {@link Session}
      * for non-transacted InOnly consumers.
-     * 
-     * @return
-     * @throws Exception
      */
     private MessageConsumerResources createConsumerListener() throws Exception {
         Session queueSession = getSessionPool().borrowObject();
@@ -233,7 +208,7 @@ public class SjmsConsumer extends Defaul
      * Helper factory method used to create a MessageListener based on the MEP
      * 
      * @param session a session is only required if we are a transacted consumer
-     * @return
+     * @return the listener
      */
     protected MessageListener createMessageHandler(Session session) {
 
@@ -245,15 +220,16 @@ public class SjmsConsumer extends Defaul
         } else {
             commitStrategy = new DefaultTransactionCommitStrategy();
         }
-        
-        Synchronization synchronization = null;
+
+        Synchronization synchronization;
         if (commitStrategy instanceof BatchTransactionCommitStrategy) {
-            synchronization = new SessionBatchTransactionSynchronization(session, commitStrategy, getTransactionBatchTimeout());
+            TimedTaskManager timedTaskManager = getEndpoint().getComponent().getTimedTaskManager();
+            synchronization = new SessionBatchTransactionSynchronization(timedTaskManager, session, commitStrategy, getTransactionBatchTimeout());
         } else {
             synchronization = new SessionTransactionSynchronization(session, commitStrategy);
         }
 
-        AbstractMessageHandler messageHandler = null;
+        AbstractMessageHandler messageHandler;
         if (getSjmsEndpoint().getExchangePattern().equals(ExchangePattern.InOnly)) {
             if (isTransacted()) {
                 messageHandler = new InOnlyMessageHandler(getEndpoint(), executor, synchronization);
@@ -338,9 +314,7 @@ public class SjmsConsumer extends Defaul
     }
 
     /**
-     * Sets the JMS Message selector syntax.
-     * 
-     * @param messageSelector Message selector syntax or null
+     * Gets the JMS Message selector syntax.
      */
     public String getMessageSelector() {
         return getSjmsEndpoint().getMessageSelector();

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java?rev=1408638&r1=1408637&r2=1408638&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java Tue Nov 13 08:50:08 2012
@@ -69,11 +69,16 @@ public class SjmsEndpoint extends Defaul
         } else if (getEndpointUri().indexOf("://topic:") > -1) {
             topic = true;
         } else {
-            throw new RuntimeCamelException("Endpoint URI unsupported: " + uri);
+            throw new IllegalArgumentException("Endpoint URI unsupported: " + uri);
         }
     }
 
     @Override
+    public SjmsComponent getComponent() {
+        return (SjmsComponent) super.getComponent();
+    }
+
+    @Override
     protected void doStart() throws Exception {
         super.doStart();
 

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java?rev=1408638&r1=1408637&r2=1408638&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/taskmanager/TimedTaskManager.java Tue Nov 13 08:50:08 2012
@@ -28,9 +28,9 @@ import java.util.concurrent.locks.Reentr
 public class TimedTaskManager {
 
     private final Timer timer = new Timer();
-    private ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    TimedTaskManager() {
+    public TimedTaskManager() {
     }
 
     public void addTask(TimerTask task, long delay) {

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java?rev=1408638&r1=1408637&r2=1408638&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionBatchTransactionSynchronization.java Tue Nov 13 08:50:08 2012
@@ -24,28 +24,27 @@ import javax.jms.Session;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.component.sjms.TransactionCommitStrategy;
-import org.apache.camel.component.sjms.taskmanager.TimedTaskManagerFactory;
+import org.apache.camel.component.sjms.taskmanager.TimedTaskManager;
 import org.apache.camel.spi.Synchronization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * SessionTransactionSynchronization is called at the completion of each
- * {@link org.apache.camel.Exhcnage}.
+ * {@link org.apache.camel.Exchange}.
  */
 public class SessionBatchTransactionSynchronization implements Synchronization {
-    private Logger log = LoggerFactory.getLogger(getClass());
+    private static final Logger LOG = LoggerFactory.getLogger(SessionBatchTransactionSynchronization.class);
     private Session session;
     private final TransactionCommitStrategy commitStrategy;
     private long batchTransactionTimeout = 5000;
     private TimeoutTask currentTask;
     private ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final TimedTaskManager timedTaskManager;
 
-    public SessionBatchTransactionSynchronization(Session session, TransactionCommitStrategy commitStrategy) {
-        this(session, commitStrategy, 5000);
-    }
-
-    public SessionBatchTransactionSynchronization(Session session, TransactionCommitStrategy commitStrategy, long batchTransactionTimeout) {
+    public SessionBatchTransactionSynchronization(TimedTaskManager timedTaskManager,
+                                                  Session session, TransactionCommitStrategy commitStrategy, long batchTransactionTimeout) {
+        this.timedTaskManager = timedTaskManager;
         this.session = session;
         if (commitStrategy == null) {
             this.commitStrategy = new DefaultTransactionCommitStrategy();
@@ -58,43 +57,39 @@ public class SessionBatchTransactionSync
         }
     }
 
-    /**
-     * @see org.apache.camel.spi.Synchronization#onFailure(org.apache.camel.Exchange)
-     * @param exchange
-     */
     @Override
     public void onFailure(Exchange exchange) {
         try {
             lock.readLock().lock();
             if (commitStrategy.rollback(exchange)) {
-                log.debug("Processing failure of Exchange id:{}", exchange.getExchangeId());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Processing failure of Exchange id:{}", exchange.getExchangeId());
+                }
                 if (session != null && session.getTransacted()) {
                     session.rollback();
                 }
             }
         } catch (Exception e) {
-            log.warn("Failed to rollback the session: {}", e.getMessage());
+            LOG.warn("Failed to rollback the session: " + e.getMessage() + ". This exception will be ignored.", e);
         } finally {
             lock.readLock().unlock();
         }
     }
 
-    /**
-     * @see org.apache.camel.spi.Synchronization#onComplete(org.apache.camel.Exchange)
-     * @param exchange
-     */
     @Override
     public void onComplete(Exchange exchange) {
         try {
             lock.readLock().lock();
             if (commitStrategy.commit(exchange)) {
-                log.debug("Processing completion of Exchange id:{}", exchange.getExchangeId());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Processing completion of Exchange id:{}", exchange.getExchangeId());
+                }
                 if (session != null && session.getTransacted()) {
                     session.commit();
                 }
             }
         } catch (Exception e) {
-            log.warn("Failed to commit the session: {}", e.getMessage());
+            LOG.warn("Failed to commit the session: " + e.getMessage() + ". This exception will be ignored.", e);
             exchange.setException(e);
         } finally {
             lock.readLock().unlock();
@@ -119,17 +114,12 @@ public class SessionBatchTransactionSync
         } finally {
             lock.writeLock().unlock();
         }
-        TimedTaskManagerFactory.getInstance().addTask(currentTask, batchTransactionTimeout);
+        timedTaskManager.addTask(currentTask, batchTransactionTimeout);
     }
 
-    public class TimeoutTask extends TimerTask {
+    public final class TimeoutTask extends TimerTask {
 
-        /**
-         * Default constructor
-         * 
-         * @param str
-         */
-        TimeoutTask() {
+        private TimeoutTask() {
         }
 
         /**
@@ -137,17 +127,17 @@ public class SessionBatchTransactionSync
          * transaction.
          */
         public void run() {
-            log.info("Batch Transaction Timer expired:");
+            LOG.debug("Batch Transaction Timer expired");
             try {
                 lock.writeLock().lock();
-                log.debug("Committing the current transactions");
+                LOG.trace("Committing the current transactions");
                 try {
                     if (session != null && session.getTransacted()) {
                         session.commit();
                     }
                     ((BatchTransactionCommitStrategy)commitStrategy).reset();
                 } catch (Exception e) {
-                    log.warn("Failed to commit the session during timeout: {}", e.getMessage());
+                    LOG.warn("Failed to commit the session during timeout: " + e.getMessage() + ". This exception will be ignored.", e);
                 }
             } finally {
                 lock.writeLock().unlock();
@@ -156,9 +146,7 @@ public class SessionBatchTransactionSync
 
         @Override
         public boolean cancel() {
-            if (log.isTraceEnabled()) {
-                log.trace("Cancelling the TimeoutTask");
-            }
+            LOG.trace("Cancelling the TimeoutTask");
             return super.cancel();
         }
     }