You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/08/07 20:37:00 UTC
svn commit: r563609 - in /activemq/trunk/activemq-core: ./
src/main/java/org/apache/activemq/camel/
src/test/java/org/apache/activemq/camel/
src/test/resources/org/apache/activemq/camel/
Author: jstrachan
Date: Tue Aug 7 11:36:58 2007
New Revision: 563609
URL: http://svn.apache.org/viewvc?view=rev&rev=563609
Log:
migrated the Camel destination code from the camel project into ActiveMQ as it makes more sense to host it here - and avoids a circular dependency issue when releasing ActiveMQ 5.
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnection.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelDestination.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageConsumer.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageProducer.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueue.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueReceiver.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueSender.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopic.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicPublisher.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicSubscriber.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/package.html
- copied unchanged from r563442, activemq/camel/trunk/components/camel-activemq/src/main/java/org/apache/camel/jms/package.html
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/CamelJmsTest.java
- copied, changed from r563442, activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/jms/CamelJmsTest.java
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/spring.xml (with props)
Modified:
activemq/trunk/activemq-core/pom.xml
Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?view=diff&rev=563609&r1=563608&r2=563609
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Tue Aug 7 11:36:58 2007
@@ -39,36 +39,37 @@
<!-- =============================== -->
<dependency>
<groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <optional>false</optional>
+ <artifactId>commons-logging-api</artifactId>
</dependency>
-
<dependency>
- <groupId>${pom.groupId}</groupId>
- <artifactId>activeio-core</artifactId>
- <optional>false</optional>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
</dependency>
<dependency>
- <groupId>${pom.groupId}</groupId>
- <artifactId>activeio-core</artifactId>
- <optional>false</optional>
- <type>test-jar</type>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jms_1.1_spec</artifactId>
+ <groupId>${pom.groupId}</groupId>
+ <artifactId>activeio-core</artifactId>
<optional>false</optional>
</dependency>
+
<!-- =============================== -->
- <!-- Optional Dependencies -->
+ <!-- Optional Dependencies -->
<!-- =============================== -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jaas</artifactId>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-jms</artifactId>
+ <optional>true</optional>
+ </dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
@@ -84,43 +85,28 @@
<artifactId>geronimo-j2ee-jacc_1.0_spec</artifactId>
<optional>true</optional>
</dependency>
-
- <!-- commons -->
- <dependency>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>commons-primitives</groupId>
- <artifactId>commons-primitives</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<optional>true</optional>
</dependency>
+ <!-- for XML parsing -->
<dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derby</artifactId>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-spring</artifactId>
<optional>true</optional>
</dependency>
<dependency>
- <groupId>axion</groupId>
- <artifactId>axion</artifactId>
- <scope>test</scope>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring</artifactId>
+ <optional>true</optional>
</dependency>
+
<dependency>
- <groupId>regexp</groupId>
- <artifactId>regexp</artifactId>
- <scope>test</scope>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <optional>true</optional>
</dependency>
<dependency>
<groupId>activemq</groupId>
@@ -132,25 +118,74 @@
<artifactId>xalan</artifactId>
<optional>true</optional>
</dependency>
+
+
+ <!--- not really a dependency at all - just added optionally to get the generator working -->
<dependency>
- <groupId>org.apache.xbean</groupId>
- <artifactId>xbean-spring</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring</artifactId>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-openwire-generator</artifactId>
<optional>true</optional>
</dependency>
+
+ <!-- =============================== -->
+ <!-- Testing Dependencies -->
+ <!-- =============================== -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-openwire-generator</artifactId>
- <optional>true</optional>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${pom.groupId}</groupId>
+ <artifactId>activeio-core</artifactId>
+ <optional>false</optional>
+ <type>test-jar</type>
+ </dependency>
+
+ <!-- testing camel helpers -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-spring</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- database testing -->
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-primitives</groupId>
+ <artifactId>commons-primitives</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>axion</groupId>
+ <artifactId>axion</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>regexp</groupId>
+ <artifactId>regexp</artifactId>
+ <scope>test</scope>
</dependency>
<!-- LDAP tests -->
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnection.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnection.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnection.java Tue Aug 7 11:36:58 2007
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+
+/**
+ * @version $Revision: $
+ */
+public class CamelConnection extends ActiveMQConnection implements CamelContextAware {
+
+ private CamelContext camelContext;
+
+ protected CamelConnection(Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
+ super(transport, clientIdGenerator, factoryStats);
+ }
+
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnection.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java Tue Aug 7 11:36:58 2007
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.spring.ActiveMQConnectionFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+
+/**
+ * A JMS ConnectionFactory which resolves non-JMS destinations or instances of
+ * {@link CamelDestination} to use the {@link CamelContext} to perform smart routing etc
+ *
+ * @version $Revision: $
+ */
+public class CamelConnectionFactory extends ActiveMQConnectionFactory implements CamelContextAware {
+ private CamelContext camelContext;
+
+ public CamelConnectionFactory() {
+ }
+
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ // Implementation methods
+ //-----------------------------------------------------------------------
+ protected CamelConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
+ CamelConnection connection = new CamelConnection(transport, getClientIdGenerator(), stats);
+ CamelContext context = getCamelContext();
+ if (context != null) {
+ connection.setCamelContext(context);
+ }
+ return connection;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelDestination.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelDestination.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelDestination.java Tue Aug 7 11:36:58 2007
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.CustomDestination;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.jms.JmsBinding;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSubscriber;
+
+/**
+ * @version $Revision: $
+ */
+public class CamelDestination implements CustomDestination, CamelContextAware {
+ private String uri;
+ private Endpoint endpoint;
+ private CamelContext camelContext;
+ private JmsBinding binding = new JmsBinding();
+
+ public CamelDestination() {
+ }
+
+ public CamelDestination(String uri) {
+ this.uri = uri;
+ }
+
+ public String toString() {
+ return uri.toString();
+ }
+
+ // CustomDestination interface
+ //-----------------------------------------------------------------------
+ public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector) {
+ return createConsumer(session, messageSelector, false);
+ }
+
+ public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector, boolean noLocal) {
+ return new CamelMessageConsumer(this, resolveEndpoint(session), session, messageSelector, noLocal);
+ }
+
+ public TopicSubscriber createSubscriber(ActiveMQSession session, String messageSelector, boolean noLocal) {
+ return createDurableSubscriber(session, null, messageSelector, noLocal);
+ }
+
+ public TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String messageSelector, boolean noLocal) {
+ throw new UnsupportedOperationException("This destination is not a Topic: " + this);
+ }
+
+ public QueueReceiver createReceiver(ActiveMQSession session, String messageSelector) {
+ throw new UnsupportedOperationException("This destination is not a Queue: " + this);
+ }
+
+ // Producers
+ //-----------------------------------------------------------------------
+ public MessageProducer createProducer(ActiveMQSession session) throws JMSException {
+ return new CamelMessageProducer(this, resolveEndpoint(session), session);
+ }
+
+ public TopicPublisher createPublisher(ActiveMQSession session) throws JMSException {
+ throw new UnsupportedOperationException("This destination is not a Topic: " + this);
+ }
+
+ public QueueSender createSender(ActiveMQSession session) throws JMSException {
+ throw new UnsupportedOperationException("This destination is not a Queue: " + this);
+ }
+
+ // Properties
+ //-----------------------------------------------------------------------
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public Endpoint getEndpoint() {
+ return endpoint;
+ }
+
+ public void setEndpoint(Endpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ public JmsBinding getBinding() {
+ return binding;
+ }
+
+ public void setBinding(JmsBinding binding) {
+ this.binding = binding;
+ }
+
+ // Implementation methods
+ //-----------------------------------------------------------------------
+
+ /**
+ * Resolves the Camel Endpoint for this destination
+ *
+ * @return
+ */
+ protected Endpoint resolveEndpoint(ActiveMQSession session) {
+ Endpoint answer = getEndpoint();
+ if (answer == null) {
+ answer = resolveCamelContext(session).getEndpoint(getUri());
+ if (answer == null) {
+ throw new IllegalArgumentException("No endpoint could be found for URI: " + getUri());
+ }
+ }
+ return answer;
+ }
+
+ protected CamelContext resolveCamelContext(ActiveMQSession session) {
+ CamelContext answer = getCamelContext();
+ if (answer == null) {
+ ActiveMQConnection connection = session.getConnection();
+ if (connection instanceof CamelConnection) {
+ CamelConnection camelConnection = (CamelConnection) connection;
+ answer = camelConnection.getCamelContext();
+ }
+ }
+ if (answer == null) {
+ throw new IllegalArgumentException("No CamelContext has been configured");
+ }
+ return answer;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelDestination.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageConsumer.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageConsumer.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageConsumer.java Tue Aug 7 11:36:58 2007
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
+/**
+ * A JMS {@link javax.jms.MessageConsumer} which consumes message exchanges from a
+ * Camel {@link Endpoint}
+ *
+ * @version $Revision: $
+ */
+public class CamelMessageConsumer implements MessageConsumer {
+ private final CamelDestination destination;
+ private final Endpoint endpoint;
+ private final ActiveMQSession session;
+ private final String messageSelector;
+ private final boolean noLocal;
+ private MessageListener messageListener;
+ private Consumer consumer;
+ private PollingConsumer pollingConsumer;
+ private boolean closed;
+
+ public CamelMessageConsumer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session, String messageSelector, boolean noLocal) {
+ this.destination = destination;
+ this.endpoint = endpoint;
+ this.session = session;
+ this.messageSelector = messageSelector;
+ this.noLocal = noLocal;
+ }
+
+ public void close() throws JMSException {
+ if (!closed) {
+ closed = true;
+ try {
+ if (consumer != null) {
+ consumer.stop();
+ }
+ if (pollingConsumer != null) {
+ pollingConsumer.stop();
+ }
+ }
+ catch (JMSException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+ }
+
+ public MessageListener getMessageListener() throws JMSException {
+ return messageListener;
+ }
+
+ public void setMessageListener(MessageListener messageListener) throws JMSException {
+ this.messageListener = messageListener;
+ if (messageListener != null && consumer == null) {
+ consumer = createConsumer();
+ }
+ }
+
+ public Message receive() throws JMSException {
+ Exchange exchange = getPollingConsumer().receive();
+ return createMessage(exchange);
+ }
+
+ public Message receive(long timeoutMillis) throws JMSException {
+ Exchange exchange = getPollingConsumer().receive(timeoutMillis);
+ return createMessage(exchange);
+ }
+
+ public Message receiveNoWait() throws JMSException {
+ Exchange exchange = getPollingConsumer().receiveNoWait();
+ return createMessage(exchange);
+ }
+
+ // Properties
+ //-----------------------------------------------------------------------
+
+ public CamelDestination getDestination() {
+ return destination;
+ }
+
+ public Endpoint getEndpoint() {
+ return endpoint;
+ }
+
+ public String getMessageSelector() {
+ return messageSelector;
+ }
+
+ public boolean isNoLocal() {
+ return noLocal;
+ }
+
+ public ActiveMQSession getSession() {
+ return session;
+ }
+
+ // Implementation methods
+ //-----------------------------------------------------------------------
+
+ protected PollingConsumer getPollingConsumer() throws JMSException {
+ try {
+ if (pollingConsumer == null) {
+ pollingConsumer = endpoint.createPollingConsumer();
+ pollingConsumer.start();
+ }
+ return pollingConsumer;
+ }
+ catch (JMSException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+
+ protected Message createMessage(Exchange exchange) throws JMSException {
+ if (exchange != null) {
+ Message message = destination.getBinding().makeJmsMessage(exchange, session);
+ return message;
+ }
+ else {
+ return null;
+ }
+ }
+
+ protected Consumer createConsumer() throws JMSException {
+ try {
+ Consumer answer = endpoint.createConsumer(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ Message message = createMessage(exchange);
+ getMessageListener().onMessage(message);
+ }
+ });
+ answer.start();
+ return answer;
+ }
+ catch (JMSException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+
+ protected void checkClosed() throws javax.jms.IllegalStateException {
+ if (closed) {
+ throw new IllegalStateException("The producer is closed");
+ }
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageProducer.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageProducer.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageProducer.java Tue Aug 7 11:36:58 2007
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQMessageProducerSupport;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Producer;
+import org.apache.camel.component.jms.JmsExchange;
+import org.apache.camel.util.ObjectHelper;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * A JMS {@link javax.jms.MessageProducer} which sends message exchanges to a
+ * Camel {@link Endpoint}
+ *
+ * @version $Revision: $
+ */
+public class CamelMessageProducer extends ActiveMQMessageProducerSupport {
+ private final CamelDestination destination;
+ private final Endpoint endpoint;
+ protected Producer producer;
+ private boolean closed;
+
+ public CamelMessageProducer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session) throws JMSException {
+ super(session);
+ this.destination = destination;
+ this.endpoint = endpoint;
+ try {
+ this.producer = endpoint.createProducer();
+ }
+ catch (JMSException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+
+ public CamelDestination getDestination() throws JMSException {
+ return destination;
+ }
+
+ public Endpoint getEndpoint() {
+ return endpoint;
+ }
+
+ public void close() throws JMSException {
+ if (!closed) {
+ closed = true;
+ try {
+ producer.stop();
+ }
+ catch (JMSException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+ }
+
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ CamelDestination camelDestination = null;
+ if (ObjectHelper.equals(destination, this.destination)) {
+ camelDestination = this.destination;
+ }
+ else {
+ // TODO support any CamelDestination?
+ throw new IllegalArgumentException("Invalid destination setting: " + destination + " when expected: " + this.destination);
+ }
+ try {
+ JmsExchange exchange = new JmsExchange(endpoint.getContext(), camelDestination.getBinding(), message);
+ producer.process(exchange);
+ }
+ catch (JMSException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+
+ protected void checkClosed() throws IllegalStateException {
+ if (closed) {
+ throw new IllegalStateException("The producer is closed");
+ }
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageProducer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueue.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueue.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueue.java Tue Aug 7 11:36:58 2007
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+import javax.jms.QueueReceiver;
+
+/**
+ * A JMS {@link Queue} object which refers to a Camel endpoint
+ *
+ * @version $Revision: $
+ */
+public class CamelQueue extends CamelDestination implements Queue {
+
+ public CamelQueue(String uri) {
+ super(uri);
+ }
+
+ public String getQueueName() throws JMSException {
+ return getUri();
+ }
+
+ public QueueSender createSender(ActiveMQSession session) throws JMSException {
+ return new CamelQueueSender(this, resolveEndpoint(session), session);
+ }
+ public QueueReceiver createReceiver(ActiveMQSession session, String messageSelector) {
+ return new CamelQueueReceiver(this, resolveEndpoint(session), session, messageSelector);
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueReceiver.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueReceiver.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueReceiver.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueReceiver.java Tue Aug 7 11:36:58 2007
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.camel.Endpoint;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+
+/**
+ * A JMS {@link javax.jms.QueueReceiver} which consumes message exchanges from a
+ * Camel {@link org.apache.camel.Endpoint}
+ *
+ * @version $Revision: $
+ */
+public class CamelQueueReceiver extends CamelMessageConsumer implements QueueReceiver {
+
+ public CamelQueueReceiver(CamelQueue destination, Endpoint endpoint, ActiveMQSession session, String name) {
+ super(destination, endpoint, session, null, false);
+ }
+
+ /**
+ * Gets the <CODE>Queue</CODE> associated with this queue receiver.
+ *
+ * @return this receiver's <CODE>Queue</CODE>
+ * @throws JMSException if the JMS provider fails to get the queue for this queue
+ * receiver due to some internal error.
+ */
+
+ public Queue getQueue() throws JMSException {
+ checkClosed();
+ return (Queue) super.getDestination();
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueReceiver.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueSender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueSender.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueSender.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueSender.java Tue Aug 7 11:36:58 2007
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.camel.Endpoint;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+/**
+ * A JMS {@link javax.jms.QueueSender} which sends message exchanges to a
+ * Camel {@link org.apache.camel.Endpoint}
+ *
+ * @version $Revision: $
+ */
+public class CamelQueueSender extends CamelMessageProducer implements QueueSender {
+
+ public CamelQueueSender(CamelQueue destination, Endpoint endpoint, ActiveMQSession session) throws JMSException {
+ super(destination, endpoint, session);
+ }
+
+
+ /**
+ * Gets the queue associated with this <CODE>QueueSender</CODE>.
+ *
+ * @return this sender's queue
+ * @throws JMSException if the JMS provider fails to get the queue for this
+ * <CODE>QueueSender</CODE> due to some internal error.
+ */
+
+ public Queue getQueue() throws JMSException {
+ return (Queue) super.getDestination();
+ }
+
+ /**
+ * Sends a message to a queue for an unidentified message producer. Uses
+ * the <CODE>QueueSender</CODE>'s default delivery mode, priority, and
+ * time to live.
+ * <p/>
+ * <p/>
+ * Typically, a message producer is assigned a queue at creation time;
+ * however, the JMS API also supports unidentified message producers, which
+ * require that the queue be supplied every time a message is sent.
+ *
+ * @param queue the queue to send this message to
+ * @param message the message to send
+ * @throws JMSException if the JMS provider fails to send the message due to some
+ * internal error.
+ * @see javax.jms.MessageProducer#getDeliveryMode()
+ * @see javax.jms.MessageProducer#getTimeToLive()
+ * @see javax.jms.MessageProducer#getPriority()
+ */
+
+ public void send(Queue queue, Message message) throws JMSException {
+ super.send(queue, message);
+ }
+
+ /**
+ * Sends a message to a queue for an unidentified message producer,
+ * specifying delivery mode, priority and time to live.
+ * <p/>
+ * <p/>
+ * Typically, a message producer is assigned a queue at creation time;
+ * however, the JMS API also supports unidentified message producers, which
+ * require that the queue be supplied every time a message is sent.
+ *
+ * @param queue the queue to send this message to
+ * @param message the message to send
+ * @param deliveryMode the delivery mode to use
+ * @param priority the priority for this message
+ * @param timeToLive the message's lifetime (in milliseconds)
+ * @throws JMSException if the JMS provider fails to send the message due to some
+ * internal error.
+ */
+
+ public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive)
+ throws JMSException {
+ super.send(queue,
+ message,
+ deliveryMode,
+ priority,
+ timeToLive);
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueSender.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopic.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopic.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopic.java Tue Aug 7 11:36:58 2007
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSubscriber;
+
+/**
+ * A JMS {@link javax.jms.Topic} object which refers to a Camel endpoint
+ *
+ * @version $Revision: $
+ */
+public class CamelTopic extends CamelDestination implements Topic {
+
+ public CamelTopic(String uri) {
+ super(uri);
+ }
+
+ public String getTopicName() throws JMSException {
+ return getUri();
+ }
+
+ public TopicPublisher createPublisher(ActiveMQSession session) throws JMSException {
+ return new CamelTopicPublisher(this, resolveEndpoint(session), session);
+ }
+
+ public TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String messageSelector, boolean noLocal) {
+ return new CamelTopicSubscriber(this, resolveEndpoint(session), session, name, messageSelector, noLocal);
+ }
+
+
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopic.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicPublisher.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicPublisher.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicPublisher.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicPublisher.java Tue Aug 7 11:36:58 2007
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.camel.Endpoint;
+
+import javax.jms.JMSException;
+import javax.jms.TopicPublisher;
+import javax.jms.Topic;
+import javax.jms.Message;
+
+/**
+ * A JMS {@link javax.jms.TopicPublisher} which sends message exchanges to a
+ * Camel {@link Endpoint}
+ *
+ * @version $Revision: $
+ */
+public class CamelTopicPublisher extends CamelMessageProducer implements TopicPublisher {
+
+ public CamelTopicPublisher(CamelTopic destination, Endpoint endpoint, ActiveMQSession session) throws JMSException {
+ super(destination, endpoint, session);
+ }
+
+
+ /**
+ * Gets the topic associated with this <CODE>TopicPublisher</CODE>.
+ *
+ * @return this publisher's topic
+ * @throws JMSException if the JMS provider fails to get the topic for this
+ * <CODE>TopicPublisher</CODE> due to some internal error.
+ */
+
+ public Topic getTopic() throws JMSException {
+ return (Topic) super.getDestination();
+ }
+
+ /**
+ * Publishes a message to the topic. Uses the <CODE>TopicPublisher</CODE>'s
+ * default delivery mode, priority, and time to live.
+ *
+ * @param message the message to publish
+ * @throws JMSException if the JMS provider fails to publish the message due to
+ * some internal error.
+ * @throws javax.jms.MessageFormatException if an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException if a client uses this method with a <CODE>TopicPublisher
+ * </CODE> with an invalid topic.
+ * @throws java.lang.UnsupportedOperationException
+ * if a client uses this method with a <CODE>TopicPublisher
+ * </CODE> that did not specify a topic at creation time.
+ * @see javax.jms.MessageProducer#getDeliveryMode()
+ * @see javax.jms.MessageProducer#getTimeToLive()
+ * @see javax.jms.MessageProducer#getPriority()
+ */
+
+ public void publish(Message message) throws JMSException {
+ super.send(message);
+ }
+
+ /**
+ * Publishes a message to the topic, specifying delivery mode, priority,
+ * and time to live.
+ *
+ * @param message the message to publish
+ * @param deliveryMode the delivery mode to use
+ * @param priority the priority for this message
+ * @param timeToLive the message's lifetime (in milliseconds)
+ * @throws JMSException if the JMS provider fails to publish the message due to
+ * some internal error.
+ * @throws javax.jms.MessageFormatException if an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException if a client uses this method with a <CODE>TopicPublisher
+ * </CODE> with an invalid topic.
+ * @throws java.lang.UnsupportedOperationException
+ * if a client uses this method with a <CODE>TopicPublisher
+ * </CODE> that did not specify a topic at creation time.
+ */
+
+ public void publish(Message message, int deliveryMode, int priority,
+ long timeToLive) throws JMSException {
+ super.send(message, deliveryMode, priority, timeToLive);
+ }
+
+ /**
+ * Publishes a message to a topic for an unidentified message producer.
+ * Uses the <CODE>TopicPublisher</CODE>'s default delivery mode,
+ * priority, and time to live.
+ * <p/>
+ * <P>
+ * Typically, a message producer is assigned a topic at creation time;
+ * however, the JMS API also supports unidentified message producers, which
+ * require that the topic be supplied every time a message is published.
+ *
+ * @param topic the topic to publish this message to
+ * @param message the message to publish
+ * @throws JMSException if the JMS provider fails to publish the message due to
+ * some internal error.
+ * @throws javax.jms.MessageFormatException if an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException if a client uses this method with an invalid topic.
+ * @see javax.jms.MessageProducer#getDeliveryMode()
+ * @see javax.jms.MessageProducer#getTimeToLive()
+ * @see javax.jms.MessageProducer#getPriority()
+ */
+
+ public void publish(Topic topic, Message message) throws JMSException {
+ super.send(topic, message);
+ }
+
+ /**
+ * Publishes a message to a topic for an unidentified message producer,
+ * specifying delivery mode, priority and time to live.
+ * <p/>
+ * <P>
+ * Typically, a message producer is assigned a topic at creation time;
+ * however, the JMS API also supports unidentified message producers, which
+ * require that the topic be supplied every time a message is published.
+ *
+ * @param topic the topic to publish this message to
+ * @param message the message to publish
+ * @param deliveryMode the delivery mode to use
+ * @param priority the priority for this message
+ * @param timeToLive the message's lifetime (in milliseconds)
+ * @throws JMSException if the JMS provider fails to publish the message due to
+ * some internal error.
+ * @throws javax.jms.MessageFormatException if an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException if a client uses this method with an invalid topic.
+ */
+
+ public void publish(Topic topic, Message message, int deliveryMode,
+ int priority, long timeToLive) throws JMSException {
+ super.send(topic, message, deliveryMode, priority, timeToLive);
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicPublisher.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicSubscriber.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicSubscriber.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicSubscriber.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicSubscriber.java Tue Aug 7 11:36:58 2007
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.camel.Endpoint;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+/**
+ * A JMS {@link javax.jms.TopicSubscriber} which consumes message exchanges from a
+ * Camel {@link Endpoint}
+ *
+ * @version $Revision: $
+ */
+public class CamelTopicSubscriber extends CamelMessageConsumer implements TopicSubscriber {
+
+ public CamelTopicSubscriber(CamelTopic destination, Endpoint endpoint, ActiveMQSession session, String name, String messageSelector, boolean noLocal) {
+ super(destination, endpoint, session, messageSelector, noLocal);
+ }
+
+ /**
+ * Gets the <CODE>Topic</CODE> associated with this subscriber.
+ *
+ * @return this subscriber's <CODE>Topic</CODE>
+ * @throws javax.jms.JMSException if the JMS provider fails to get the topic for this topic
+ * subscriber due to some internal error.
+ */
+
+ public Topic getTopic() throws JMSException {
+ checkClosed();
+ return (Topic) super.getDestination();
+ }
+
+ /**
+ * Gets the <CODE>NoLocal</CODE> attribute for this subscriber. The
+ * default value for this attribute is false.
+ *
+ * @return true if locally published messages are being inhibited
+ * @throws JMSException if the JMS provider fails to get the <CODE>NoLocal
+ * </CODE> attribute for this topic subscriber due to some
+ * internal error.
+ */
+
+ public boolean getNoLocal() throws JMSException {
+ checkClosed();
+ return super.isNoLocal();
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicSubscriber.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/CamelJmsTest.java (from r563442, activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/jms/CamelJmsTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/CamelJmsTest.java?view=diff&rev=563609&p1=activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/jms/CamelJmsTest.java&r1=563442&p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/CamelJmsTest.java&r2=563609
==============================================================================
--- activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/jms/CamelJmsTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/CamelJmsTest.java Tue Aug 7 11:36:58 2007
@@ -14,8 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.jms;
+package org.apache.activemq.camel;
+import junit.framework.Assert;
import org.apache.camel.CamelTemplate;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spring.SpringTestSupport;
@@ -78,10 +79,10 @@
template.sendBody("seda:consumer", expectedBody);
Message message = consumer.receive(5000);
- assertNotNull("Should have received a message from destination: " + destination, message);
+ Assert.assertNotNull("Should have received a message from destination: " + destination, message);
TextMessage textMessage = assertIsInstanceOf(TextMessage.class, message);
- assertEquals("Message body", expectedBody, textMessage.getText());
+ Assert.assertEquals("Message body", expectedBody, textMessage.getText());
log.info("Received message: " + message);
}
@@ -91,6 +92,6 @@
}
protected ClassPathXmlApplicationContext createApplicationContext() {
- return new ClassPathXmlApplicationContext("org/apache/camel/jms/spring.xml");
+ return new ClassPathXmlApplicationContext("org/apache/activemq/camel/spring.xml");
}
-}
+}
\ No newline at end of file
Added: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/spring.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/spring.xml?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/spring.xml (added)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/spring.xml Tue Aug 7 11:36:58 2007
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
+ ">
+
+ <!-- START SNIPPET: example -->
+ <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
+ <beanPostProcessor/>
+
+ </camelContext>
+
+ <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
+ <property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
+ </bean>
+
+ <bean id="sendTo" class="org.apache.activemq.camel.CamelDestination">
+ <property name="uri" value="mock:result"/>
+ </bean>
+
+ <bean id="consumeFrom" class="org.apache.activemq.camel.CamelDestination">
+ <property name="uri" value="seda:consumer"/>
+ </bean>
+
+ <bean id="camelTemplate" class="org.apache.camel.spring.CamelTemplateFactoryBean"/>
+ <!-- END SNIPPET: example -->
+
+<!--
+ <bean id="connectionFactory" class="org.apache.camel.jms.CamelConnectionFactory">
+ <property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
+ </bean>
+-->
+
+</beans>
Propchange: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/spring.xml
------------------------------------------------------------------------------
svn:eol-style = native