You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/05/24 17:27:01 UTC

svn commit: r541323 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/impl/ components/camel-jms/src/main/java/org/apache/camel/component/jms/

Author: jstrachan
Date: Thu May 24 08:27:00 2007
New Revision: 541323

URL: http://svn.apache.org/viewvc?view=rev&rev=541323
Log:
added support for a pull style (synchronous receive) based consumers which can make some transactional uses easier (start a txn, pull a message, commit)

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/PullConsumer.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPullConsumer.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PullConsumerSupport.java   (with props)
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPullConsumer.java   (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=541323&r1=541322&r2=541323
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Thu May 24 08:27:00 2007
@@ -70,9 +70,20 @@
     Producer<E> createProducer() throws Exception;
 
     /**
-     * Creates a new consumer which consumes messages from the endpoint using the given processor
+     * Creates a new <a href="http://activemq.apache.org/camel/event-driven-consumer.html">event based consumer</>
+     * which consumes messages from the endpoint using the given processor
      *
      * @return a newly created consumer
      */
     Consumer<E> createConsumer(Processor processor) throws Exception;
+
+    /**
+     * Creates a new <a href="http://activemq.apache.org/camel/polling-consumer.html">Pull Consumer</a>
+     * so that the caller can pull message exchanges from the consumer
+     * when it wishes (rather than using the event driven consumer returned by {@link #createConsumer(Processor)}
+     *
+     * @return a newly created pull consumer
+     * @throws Exception if the pull consumer could not be created
+     */
+    PullConsumer<E> createPullConsumer() throws Exception;
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/PullConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/PullConsumer.java?view=auto&rev=541323
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/PullConsumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/PullConsumer.java Thu May 24 08:27:00 2007
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Represents a <a href="http://activemq.apache.org/camel/polling-consumer.html">Polling Consumer</a> where the caller
+ * pulls messages when it is ready.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public interface PullConsumer<E extends Exchange> extends Consumer<E> {
+    
+    /**
+     * Attempts to receive a message exchange immediately without waiting
+     * or returning null if a message exchange is not available yet.
+     *
+     * @return
+     */
+    E receiveNoWait();
+
+    E receive();
+
+    E receive(long timeout);
+
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/PullConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?view=diff&rev=541323&r1=541322&r2=541323
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Thu May 24 08:27:00 2007
@@ -37,7 +37,6 @@
         this.processor = processor;
     }
 
-    
     @Override
 	public String toString() {
 		return "Consumer on " + endpoint;

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=541323&r1=541322&r2=541323
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Thu May 24 08:27:00 2007
@@ -20,13 +20,11 @@
 import org.apache.camel.Component;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.Service;
+import org.apache.camel.PullConsumer;
 import org.apache.camel.util.ObjectHelper;
 
-import java.lang.reflect.TypeVariable;
-import java.lang.reflect.Type;
 import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
@@ -101,6 +99,10 @@
         this.executorService = executorService;
     }
 
+    public PullConsumer<E> createPullConsumer() throws Exception {
+        return new DefaultPullConsumer<E>(this);
+    }
+
     /**
      * Converts the given exchange to the specified exchange type
      */
@@ -147,7 +149,7 @@
         return null;
     }
 
-  protected ScheduledThreadPoolExecutor createExecutorService() {
+    protected ScheduledThreadPoolExecutor createExecutorService() {
         return new ScheduledThreadPoolExecutor(10);
     }
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPullConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPullConsumer.java?view=auto&rev=541323
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPullConsumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPullConsumer.java Thu May 24 08:27:00 2007
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.PullConsumer;
+import org.apache.camel.processor.Logger;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A default implementation of the {@link PullConsumer} which uses the normal asynchronous consumer mechanism
+ * along with a {@link BlockingQueue} to allow the caller to pull messages on demand.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class DefaultPullConsumer<E extends Exchange> extends PullConsumerSupport<E> implements Processor {
+    private static final transient Log log = LogFactory.getLog(DefaultPullConsumer.class);
+    private BlockingQueue<E> queue;
+    private ExceptionHandler interuptedExceptionHandler = new LoggingExceptionHandler(new Logger(log));
+    private Consumer<E> consumer;
+
+    public DefaultPullConsumer(Endpoint<E> endpoint) {
+        this(endpoint, new ArrayBlockingQueue<E>(1000));
+    }
+
+    public DefaultPullConsumer(Endpoint<E> endpoint, BlockingQueue<E> queue) {
+        super(endpoint);
+        this.queue = queue;
+    }
+
+    public E receiveNoWait() {
+        return receive(0);
+    }
+
+    public E receive() {
+        while (!isStopping() && !isStopped()) {
+            try {
+                return queue.take();
+            }
+            catch (InterruptedException e) {
+                handleInteruptedException(e);
+            }
+        }
+        return null;
+    }
+
+    public E receive(long timeout) {
+        try {
+            return queue.poll(timeout, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e) {
+            handleInteruptedException(e);
+            return null;
+        }
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        queue.offer((E) exchange);
+    }
+
+    public ExceptionHandler getInteruptedExceptionHandler() {
+        return interuptedExceptionHandler;
+    }
+
+    public void setInteruptedExceptionHandler(ExceptionHandler interuptedExceptionHandler) {
+        this.interuptedExceptionHandler = interuptedExceptionHandler;
+    }
+
+    protected void handleInteruptedException(InterruptedException e) {
+        getInteruptedExceptionHandler().handleException(e);
+    }
+
+    protected void doStart() throws Exception {
+        // lets add ourselves as a consumer
+        consumer = getEndpoint().createConsumer(this);
+        consumer.start();
+    }
+
+    protected void doStop() throws Exception {
+        if (consumer != null) {
+            try {
+                consumer.stop();
+            }
+            finally {
+                consumer = null;
+            }
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPullConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PullConsumerSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PullConsumerSupport.java?view=auto&rev=541323
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PullConsumerSupport.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PullConsumerSupport.java Thu May 24 08:27:00 2007
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.PullConsumer;
+import org.apache.camel.spi.ExceptionHandler;
+
+/**
+ * A useful base class for implementations of {@link PullConsumer}
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class PullConsumerSupport<E extends Exchange> extends ServiceSupport implements PullConsumer<E> {
+    private final Endpoint<E> endpoint;
+    private ExceptionHandler exceptionHandler;
+
+    public PullConsumerSupport(Endpoint<E> endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public String toString() {
+        return "PullConsumer on " + endpoint;
+    }
+
+    public Endpoint<E> getEndpoint() {
+        return endpoint;
+    }
+
+    public ExceptionHandler getExceptionHandler() {
+        if (exceptionHandler == null) {
+            exceptionHandler = new LoggingExceptionHandler(getClass());
+        }
+        return exceptionHandler;
+    }
+
+    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+
+    /**
+     * Handles the given exception using the {@link #getExceptionHandler()}
+     *
+     * @param t the exception to handle
+     */
+    protected void handleException(Throwable t) {
+        getExceptionHandler().handleException(t);
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PullConsumerSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?view=diff&rev=541323&r1=541322&r2=541323
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Thu May 24 08:27:00 2007
@@ -21,6 +21,7 @@
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.PullConsumer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.springframework.jms.core.JmsOperations;
 import org.springframework.jms.core.JmsTemplate;
@@ -46,7 +47,7 @@
     }
 
     public JmsProducer createProducer() throws Exception {
-        JmsOperations template = configuration.createJmsOperations(pubSubDomain, destination);
+        JmsOperations template = createJmsOperations();
         return createProducer(template);
     }
 
@@ -84,6 +85,12 @@
         return new JmsConsumer(this, processor, listenerContainer);
     }
 
+    @Override
+    public PullConsumer<JmsExchange> createPullConsumer() throws Exception {
+        JmsOperations template = createJmsOperations();
+        return new JmsPullConsumer(this, template);
+    }
+
     public JmsExchange createExchange() {
         return new JmsExchange(getContext(), getBinding());
     }
@@ -132,5 +139,10 @@
 	public boolean isSingleton() {
 		return false;
 	}
+
+
+    protected JmsOperations createJmsOperations() {
+        return configuration.createJmsOperations(pubSubDomain, destination);
+    }
 
 }

Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPullConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPullConsumer.java?view=auto&rev=541323
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPullConsumer.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPullConsumer.java Thu May 24 08:27:00 2007
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import org.apache.camel.impl.PullConsumerSupport;
+import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.JmsTemplate102;
+
+import javax.jms.Message;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class JmsPullConsumer extends PullConsumerSupport<JmsExchange> {
+    private JmsOperations template;
+
+    public JmsPullConsumer(JmsEndpoint endpoint, JmsOperations template) {
+        super(endpoint);
+        this.template = template;
+    }
+
+    @Override
+    public JmsEndpoint getEndpoint() {
+        return (JmsEndpoint) super.getEndpoint();
+    }
+
+    public JmsExchange receiveNoWait() {
+        return receive(0);
+    }
+
+    public JmsExchange receive() {
+        return receive(-1);
+    }
+
+    public JmsExchange receive(long timeout) {
+        setReceiveTimeout(timeout);
+        Message message = template.receive();
+        if (message != null) {
+            return getEndpoint().createExchange(message);
+        }
+        return null;
+    }
+
+    protected void doStart() throws Exception {
+    }
+
+    protected void doStop() throws Exception {
+    }
+
+    protected void setReceiveTimeout(long timeout) {
+        if (template instanceof JmsTemplate) {
+            JmsTemplate jmsTemplate = (JmsTemplate) template;
+            jmsTemplate.setReceiveTimeout(timeout);
+        }
+        else if (template instanceof JmsTemplate102) {
+            JmsTemplate102 jmsTemplate102 = (JmsTemplate102) template;
+            jmsTemplate102.setReceiveTimeout(timeout);
+        }
+        else {
+            throw new IllegalArgumentException("Cannot set the receiveTimeout property on unknown JmsOperations type: " + template);
+        }
+    }
+}

Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPullConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native