You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/05/24 17:27:01 UTC
svn commit: r541323 - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/impl/
components/camel-jms/src/main/java/org/apache/camel/component/jms/
Author: jstrachan
Date: Thu May 24 08:27:00 2007
New Revision: 541323
URL: http://svn.apache.org/viewvc?view=rev&rev=541323
Log:
added support for a pull style (synchronous receive) based consumers which can make some transactional uses easier (start a txn, pull a message, commit)
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/PullConsumer.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPullConsumer.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PullConsumerSupport.java (with props)
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPullConsumer.java (with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=541323&r1=541322&r2=541323
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Thu May 24 08:27:00 2007
@@ -70,9 +70,20 @@
Producer<E> createProducer() throws Exception;
/**
- * Creates a new consumer which consumes messages from the endpoint using the given processor
+ * Creates a new <a href="http://activemq.apache.org/camel/event-driven-consumer.html">event based consumer</>
+ * which consumes messages from the endpoint using the given processor
*
* @return a newly created consumer
*/
Consumer<E> createConsumer(Processor processor) throws Exception;
+
+ /**
+ * Creates a new <a href="http://activemq.apache.org/camel/polling-consumer.html">Pull Consumer</a>
+ * so that the caller can pull message exchanges from the consumer
+ * when it wishes (rather than using the event driven consumer returned by {@link #createConsumer(Processor)}
+ *
+ * @return a newly created pull consumer
+ * @throws Exception if the pull consumer could not be created
+ */
+ PullConsumer<E> createPullConsumer() throws Exception;
}
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/PullConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/PullConsumer.java?view=auto&rev=541323
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/PullConsumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/PullConsumer.java Thu May 24 08:27:00 2007
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Represents a <a href="http://activemq.apache.org/camel/polling-consumer.html">Polling Consumer</a> where the caller
+ * pulls messages when it is ready.
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface PullConsumer<E extends Exchange> extends Consumer<E> {
+
+ /**
+ * Attempts to receive a message exchange immediately without waiting
+ * or returning null if a message exchange is not available yet.
+ *
+ * @return
+ */
+ E receiveNoWait();
+
+ E receive();
+
+ E receive(long timeout);
+
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/PullConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?view=diff&rev=541323&r1=541322&r2=541323
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Thu May 24 08:27:00 2007
@@ -37,7 +37,6 @@
this.processor = processor;
}
-
@Override
public String toString() {
return "Consumer on " + endpoint;
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=541323&r1=541322&r2=541323
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Thu May 24 08:27:00 2007
@@ -20,13 +20,11 @@
import org.apache.camel.Component;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.Service;
+import org.apache.camel.PullConsumer;
import org.apache.camel.util.ObjectHelper;
-import java.lang.reflect.TypeVariable;
-import java.lang.reflect.Type;
import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -101,6 +99,10 @@
this.executorService = executorService;
}
+ public PullConsumer<E> createPullConsumer() throws Exception {
+ return new DefaultPullConsumer<E>(this);
+ }
+
/**
* Converts the given exchange to the specified exchange type
*/
@@ -147,7 +149,7 @@
return null;
}
- protected ScheduledThreadPoolExecutor createExecutorService() {
+ protected ScheduledThreadPoolExecutor createExecutorService() {
return new ScheduledThreadPoolExecutor(10);
}
}
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPullConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPullConsumer.java?view=auto&rev=541323
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPullConsumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPullConsumer.java Thu May 24 08:27:00 2007
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.PullConsumer;
+import org.apache.camel.processor.Logger;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A default implementation of the {@link PullConsumer} which uses the normal asynchronous consumer mechanism
+ * along with a {@link BlockingQueue} to allow the caller to pull messages on demand.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class DefaultPullConsumer<E extends Exchange> extends PullConsumerSupport<E> implements Processor {
+ private static final transient Log log = LogFactory.getLog(DefaultPullConsumer.class);
+ private BlockingQueue<E> queue;
+ private ExceptionHandler interuptedExceptionHandler = new LoggingExceptionHandler(new Logger(log));
+ private Consumer<E> consumer;
+
+ public DefaultPullConsumer(Endpoint<E> endpoint) {
+ this(endpoint, new ArrayBlockingQueue<E>(1000));
+ }
+
+ public DefaultPullConsumer(Endpoint<E> endpoint, BlockingQueue<E> queue) {
+ super(endpoint);
+ this.queue = queue;
+ }
+
+ public E receiveNoWait() {
+ return receive(0);
+ }
+
+ public E receive() {
+ while (!isStopping() && !isStopped()) {
+ try {
+ return queue.take();
+ }
+ catch (InterruptedException e) {
+ handleInteruptedException(e);
+ }
+ }
+ return null;
+ }
+
+ public E receive(long timeout) {
+ try {
+ return queue.poll(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ handleInteruptedException(e);
+ return null;
+ }
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ queue.offer((E) exchange);
+ }
+
+ public ExceptionHandler getInteruptedExceptionHandler() {
+ return interuptedExceptionHandler;
+ }
+
+ public void setInteruptedExceptionHandler(ExceptionHandler interuptedExceptionHandler) {
+ this.interuptedExceptionHandler = interuptedExceptionHandler;
+ }
+
+ protected void handleInteruptedException(InterruptedException e) {
+ getInteruptedExceptionHandler().handleException(e);
+ }
+
+ protected void doStart() throws Exception {
+ // lets add ourselves as a consumer
+ consumer = getEndpoint().createConsumer(this);
+ consumer.start();
+ }
+
+ protected void doStop() throws Exception {
+ if (consumer != null) {
+ try {
+ consumer.stop();
+ }
+ finally {
+ consumer = null;
+ }
+ }
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPullConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PullConsumerSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PullConsumerSupport.java?view=auto&rev=541323
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PullConsumerSupport.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PullConsumerSupport.java Thu May 24 08:27:00 2007
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.PullConsumer;
+import org.apache.camel.spi.ExceptionHandler;
+
+/**
+ * A useful base class for implementations of {@link PullConsumer}
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class PullConsumerSupport<E extends Exchange> extends ServiceSupport implements PullConsumer<E> {
+ private final Endpoint<E> endpoint;
+ private ExceptionHandler exceptionHandler;
+
+ public PullConsumerSupport(Endpoint<E> endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public String toString() {
+ return "PullConsumer on " + endpoint;
+ }
+
+ public Endpoint<E> getEndpoint() {
+ return endpoint;
+ }
+
+ public ExceptionHandler getExceptionHandler() {
+ if (exceptionHandler == null) {
+ exceptionHandler = new LoggingExceptionHandler(getClass());
+ }
+ return exceptionHandler;
+ }
+
+ public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+ this.exceptionHandler = exceptionHandler;
+ }
+
+ /**
+ * Handles the given exception using the {@link #getExceptionHandler()}
+ *
+ * @param t the exception to handle
+ */
+ protected void handleException(Throwable t) {
+ getExceptionHandler().handleException(t);
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PullConsumerSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?view=diff&rev=541323&r1=541322&r2=541323
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Thu May 24 08:27:00 2007
@@ -21,6 +21,7 @@
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.PullConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
@@ -46,7 +47,7 @@
}
public JmsProducer createProducer() throws Exception {
- JmsOperations template = configuration.createJmsOperations(pubSubDomain, destination);
+ JmsOperations template = createJmsOperations();
return createProducer(template);
}
@@ -84,6 +85,12 @@
return new JmsConsumer(this, processor, listenerContainer);
}
+ @Override
+ public PullConsumer<JmsExchange> createPullConsumer() throws Exception {
+ JmsOperations template = createJmsOperations();
+ return new JmsPullConsumer(this, template);
+ }
+
public JmsExchange createExchange() {
return new JmsExchange(getContext(), getBinding());
}
@@ -132,5 +139,10 @@
public boolean isSingleton() {
return false;
}
+
+
+ protected JmsOperations createJmsOperations() {
+ return configuration.createJmsOperations(pubSubDomain, destination);
+ }
}
Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPullConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPullConsumer.java?view=auto&rev=541323
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPullConsumer.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPullConsumer.java Thu May 24 08:27:00 2007
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import org.apache.camel.impl.PullConsumerSupport;
+import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.JmsTemplate102;
+
+import javax.jms.Message;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class JmsPullConsumer extends PullConsumerSupport<JmsExchange> {
+ private JmsOperations template;
+
+ public JmsPullConsumer(JmsEndpoint endpoint, JmsOperations template) {
+ super(endpoint);
+ this.template = template;
+ }
+
+ @Override
+ public JmsEndpoint getEndpoint() {
+ return (JmsEndpoint) super.getEndpoint();
+ }
+
+ public JmsExchange receiveNoWait() {
+ return receive(0);
+ }
+
+ public JmsExchange receive() {
+ return receive(-1);
+ }
+
+ public JmsExchange receive(long timeout) {
+ setReceiveTimeout(timeout);
+ Message message = template.receive();
+ if (message != null) {
+ return getEndpoint().createExchange(message);
+ }
+ return null;
+ }
+
+ protected void doStart() throws Exception {
+ }
+
+ protected void doStop() throws Exception {
+ }
+
+ protected void setReceiveTimeout(long timeout) {
+ if (template instanceof JmsTemplate) {
+ JmsTemplate jmsTemplate = (JmsTemplate) template;
+ jmsTemplate.setReceiveTimeout(timeout);
+ }
+ else if (template instanceof JmsTemplate102) {
+ JmsTemplate102 jmsTemplate102 = (JmsTemplate102) template;
+ jmsTemplate102.setReceiveTimeout(timeout);
+ }
+ else {
+ throw new IllegalArgumentException("Cannot set the receiveTimeout property on unknown JmsOperations type: " + template);
+ }
+ }
+}
Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPullConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native