You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/08/07 20:37:00 UTC

svn commit: r563609 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/camel/ src/test/java/org/apache/activemq/camel/ src/test/resources/org/apache/activemq/camel/

Author: jstrachan
Date: Tue Aug  7 11:36:58 2007
New Revision: 563609

URL: http://svn.apache.org/viewvc?view=rev&rev=563609
Log:
migrated the Camel destination code from the camel project into ActiveMQ as it makes more sense to host it here - and avoids a circular dependency issue when releasing ActiveMQ 5.

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnection.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelDestination.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageConsumer.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageProducer.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueue.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueReceiver.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueSender.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopic.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicPublisher.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicSubscriber.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/package.html
      - copied unchanged from r563442, activemq/camel/trunk/components/camel-activemq/src/main/java/org/apache/camel/jms/package.html
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/CamelJmsTest.java
      - copied, changed from r563442, activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/jms/CamelJmsTest.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/spring.xml   (with props)
Modified:
    activemq/trunk/activemq-core/pom.xml

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?view=diff&rev=563609&r1=563608&r2=563609
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Tue Aug  7 11:36:58 2007
@@ -39,36 +39,37 @@
     <!-- =============================== -->
     <dependency>
       <groupId>commons-logging</groupId>
-      <artifactId>commons-logging</artifactId>
-      <optional>false</optional>
+      <artifactId>commons-logging-api</artifactId>
     </dependency>
-
     <dependency>
-      <groupId>${pom.groupId}</groupId>
-      <artifactId>activeio-core</artifactId>
-      <optional>false</optional>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
     </dependency>
     <dependency>
-      <groupId>${pom.groupId}</groupId>
-      <artifactId>activeio-core</artifactId>
-      <optional>false</optional>
-      <type>test-jar</type>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-jms_1.1_spec</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>org.apache.geronimo.specs</groupId>
-      <artifactId>geronimo-jms_1.1_spec</artifactId>
+      <groupId>${pom.groupId}</groupId>
+      <artifactId>activeio-core</artifactId>
       <optional>false</optional>
     </dependency>
 
+
     <!-- =============================== -->
-    <!-- Optional Dependencies -->
+    <!-- Optional Dependencies           -->
     <!-- =============================== -->
     <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-jaas</artifactId>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-jms</artifactId>
+      <optional>true</optional>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.geronimo.specs</groupId>
@@ -84,43 +85,28 @@
       <artifactId>geronimo-j2ee-jacc_1.0_spec</artifactId>
       <optional>true</optional>
     </dependency>
-
-    <!-- commons -->
-    <dependency>
-      <groupId>commons-collections</groupId>
-      <artifactId>commons-collections</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-primitives</groupId>
-      <artifactId>commons-primitives</artifactId>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>commons-pool</groupId>
       <artifactId>commons-pool</artifactId>
       <optional>true</optional>
     </dependency>
 
+    <!-- for XML parsing -->
     <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.derby</groupId>
-      <artifactId>derby</artifactId>
+      <groupId>org.apache.xbean</groupId>
+      <artifactId>xbean-spring</artifactId>
       <optional>true</optional>
     </dependency>
     <dependency>
-      <groupId>axion</groupId>
-      <artifactId>axion</artifactId>
-      <scope>test</scope>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring</artifactId>
+      <optional>true</optional>
     </dependency>
+
     <dependency>
-      <groupId>regexp</groupId>
-      <artifactId>regexp</artifactId>
-      <scope>test</scope>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derby</artifactId>
+      <optional>true</optional>
     </dependency>
     <dependency>
       <groupId>activemq</groupId>
@@ -132,25 +118,74 @@
       <artifactId>xalan</artifactId>
       <optional>true</optional>
     </dependency>
+
+
+    <!--- not really a dependency at all - just added optionally to get the generator working -->
     <dependency>
-      <groupId>org.apache.xbean</groupId>
-      <artifactId>xbean-spring</artifactId>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework</groupId>
-      <artifactId>spring</artifactId>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-openwire-generator</artifactId>
       <optional>true</optional>
     </dependency>
+
+    <!-- =============================== -->
+    <!-- Testing Dependencies            -->
+    <!-- =============================== -->
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-openwire-generator</artifactId>
-      <optional>true</optional>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${pom.groupId}</groupId>
+      <artifactId>activeio-core</artifactId>
+      <optional>false</optional>
+      <type>test-jar</type>
+    </dependency>
+
+    <!-- testing camel helpers -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-spring</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- database testing -->
+    <dependency>
+      <groupId>commons-collections</groupId>
+      <artifactId>commons-collections</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-primitives</groupId>
+      <artifactId>commons-primitives</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>axion</groupId>
+      <artifactId>axion</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>regexp</groupId>
+      <artifactId>regexp</artifactId>
+      <scope>test</scope>
     </dependency>
 
     <!--  LDAP tests -->

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnection.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnection.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnection.java Tue Aug  7 11:36:58 2007
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+
+/**
+ * @version $Revision: $
+ */
+public class CamelConnection extends ActiveMQConnection implements CamelContextAware {
+
+    private CamelContext camelContext;
+
+    protected CamelConnection(Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
+        super(transport, clientIdGenerator, factoryStats);
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java Tue Aug  7 11:36:58 2007
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.spring.ActiveMQConnectionFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+
+/**
+ * A JMS ConnectionFactory which resolves non-JMS destinations or instances of
+ * {@link CamelDestination} to use the {@link CamelContext} to perform smart routing etc
+ *
+ * @version $Revision: $
+ */
+public class CamelConnectionFactory extends ActiveMQConnectionFactory implements CamelContextAware {
+    private CamelContext camelContext;
+
+    public CamelConnectionFactory() {
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    // Implementation methods
+    //-----------------------------------------------------------------------
+    protected CamelConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
+        CamelConnection connection = new CamelConnection(transport, getClientIdGenerator(), stats);
+        CamelContext context = getCamelContext();
+        if (context != null) {
+            connection.setCamelContext(context);
+        }
+        return connection;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelDestination.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelDestination.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelDestination.java Tue Aug  7 11:36:58 2007
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.CustomDestination;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.jms.JmsBinding;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSubscriber;
+
+/**
+ * @version $Revision: $
+ */
+public class CamelDestination implements CustomDestination, CamelContextAware {
+    private String uri;
+    private Endpoint endpoint;
+    private CamelContext camelContext;
+    private JmsBinding binding = new JmsBinding();
+
+    public CamelDestination() {
+    }
+
+    public CamelDestination(String uri) {
+        this.uri = uri;
+    }
+
+    public String toString() {
+        return uri.toString();
+    }
+
+    // CustomDestination interface
+    //-----------------------------------------------------------------------
+    public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector) {
+        return createConsumer(session, messageSelector, false);
+    }
+
+    public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector, boolean noLocal) {
+        return new CamelMessageConsumer(this, resolveEndpoint(session), session, messageSelector, noLocal);
+    }
+
+    public TopicSubscriber createSubscriber(ActiveMQSession session, String messageSelector, boolean noLocal) {
+        return createDurableSubscriber(session, null, messageSelector, noLocal);
+    }
+
+    public TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String messageSelector, boolean noLocal) {
+        throw new UnsupportedOperationException("This destination is not a Topic: " + this);
+    }
+
+    public QueueReceiver createReceiver(ActiveMQSession session, String messageSelector) {
+        throw new UnsupportedOperationException("This destination is not a Queue: " + this);
+    }
+
+    // Producers
+    //-----------------------------------------------------------------------
+    public MessageProducer createProducer(ActiveMQSession session) throws JMSException {
+        return new CamelMessageProducer(this, resolveEndpoint(session), session);
+    }
+
+    public TopicPublisher createPublisher(ActiveMQSession session) throws JMSException {
+        throw new UnsupportedOperationException("This destination is not a Topic: " + this);
+    }
+
+    public QueueSender createSender(ActiveMQSession session) throws JMSException {
+        throw new UnsupportedOperationException("This destination is not a Queue: " + this);
+    }
+
+    // Properties
+    //-----------------------------------------------------------------------
+
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public Endpoint getEndpoint() {
+        return endpoint;
+    }
+
+    public void setEndpoint(Endpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public JmsBinding getBinding() {
+        return binding;
+    }
+
+    public void setBinding(JmsBinding binding) {
+        this.binding = binding;
+    }
+
+    // Implementation methods
+    //-----------------------------------------------------------------------
+
+    /**
+     * Resolves the Camel Endpoint for this destination
+     *
+     * @return
+     */
+    protected Endpoint resolveEndpoint(ActiveMQSession session) {
+        Endpoint answer = getEndpoint();
+        if (answer == null) {
+            answer = resolveCamelContext(session).getEndpoint(getUri());
+            if (answer == null) {
+                throw new IllegalArgumentException("No endpoint could be found for URI: " + getUri());
+            }
+        }
+        return answer;
+    }
+
+    protected CamelContext resolveCamelContext(ActiveMQSession session) {
+        CamelContext answer = getCamelContext();
+        if (answer == null) {
+            ActiveMQConnection connection = session.getConnection();
+            if (connection instanceof CamelConnection) {
+                CamelConnection camelConnection = (CamelConnection) connection;
+                answer = camelConnection.getCamelContext();
+            }
+        }
+        if (answer == null) {
+            throw new IllegalArgumentException("No CamelContext has been configured");
+        }
+        return answer;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelDestination.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageConsumer.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageConsumer.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageConsumer.java Tue Aug  7 11:36:58 2007
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
+/**
+ * A JMS {@link javax.jms.MessageConsumer} which consumes message exchanges from a
+ * Camel {@link Endpoint}
+ *
+ * @version $Revision: $
+ */
+public class CamelMessageConsumer implements MessageConsumer {
+    private final CamelDestination destination;
+    private final Endpoint endpoint;
+    private final ActiveMQSession session;
+    private final String messageSelector;
+    private final boolean noLocal;
+    private MessageListener messageListener;
+    private Consumer consumer;
+    private PollingConsumer pollingConsumer;
+    private boolean closed;
+
+    public CamelMessageConsumer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session, String messageSelector, boolean noLocal) {
+        this.destination = destination;
+        this.endpoint = endpoint;
+        this.session = session;
+        this.messageSelector = messageSelector;
+        this.noLocal = noLocal;
+    }
+
+    public void close() throws JMSException {
+        if (!closed) {
+            closed = true;
+            try {
+                if (consumer != null) {
+                    consumer.stop();
+                }
+                if (pollingConsumer != null) {
+                    pollingConsumer.stop();
+                }
+            }
+            catch (JMSException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                throw JMSExceptionSupport.create(e);
+            }
+        }
+    }
+
+    public MessageListener getMessageListener() throws JMSException {
+        return messageListener;
+    }
+
+    public void setMessageListener(MessageListener messageListener) throws JMSException {
+        this.messageListener = messageListener;
+        if (messageListener != null && consumer == null) {
+            consumer = createConsumer();
+        }
+    }
+
+    public Message receive() throws JMSException {
+        Exchange exchange = getPollingConsumer().receive();
+        return createMessage(exchange);
+    }
+
+    public Message receive(long timeoutMillis) throws JMSException {
+        Exchange exchange = getPollingConsumer().receive(timeoutMillis);
+        return createMessage(exchange);
+    }
+
+    public Message receiveNoWait() throws JMSException {
+        Exchange exchange = getPollingConsumer().receiveNoWait();
+        return createMessage(exchange);
+    }
+
+    // Properties
+    //-----------------------------------------------------------------------
+
+    public CamelDestination getDestination() {
+        return destination;
+    }
+
+    public Endpoint getEndpoint() {
+        return endpoint;
+    }
+
+    public String getMessageSelector() {
+        return messageSelector;
+    }
+
+    public boolean isNoLocal() {
+        return noLocal;
+    }
+
+    public ActiveMQSession getSession() {
+        return session;
+    }
+
+    // Implementation methods
+    //-----------------------------------------------------------------------
+
+    protected PollingConsumer getPollingConsumer() throws JMSException {
+        try {
+            if (pollingConsumer == null) {
+                pollingConsumer = endpoint.createPollingConsumer();
+                pollingConsumer.start();
+            }
+            return pollingConsumer;
+        }
+        catch (JMSException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw JMSExceptionSupport.create(e);
+        }
+    }
+
+    protected Message createMessage(Exchange exchange) throws JMSException {
+        if (exchange != null) {
+            Message message = destination.getBinding().makeJmsMessage(exchange, session);
+            return message;
+        }
+        else {
+            return null;
+        }
+    }
+
+    protected Consumer createConsumer() throws JMSException {
+        try {
+            Consumer answer = endpoint.createConsumer(new Processor() {
+                public void process(Exchange exchange) throws Exception {
+                    Message message = createMessage(exchange);
+                    getMessageListener().onMessage(message);
+                }
+            });
+            answer.start();
+            return answer;
+        }
+        catch (JMSException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw JMSExceptionSupport.create(e);
+        }
+    }
+
+    protected void checkClosed() throws javax.jms.IllegalStateException {
+        if (closed) {
+            throw new IllegalStateException("The producer is closed");
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageProducer.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageProducer.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageProducer.java Tue Aug  7 11:36:58 2007
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQMessageProducerSupport;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Producer;
+import org.apache.camel.component.jms.JmsExchange;
+import org.apache.camel.util.ObjectHelper;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * A JMS {@link javax.jms.MessageProducer} which sends message exchanges to a
+ * Camel {@link Endpoint}
+ * 
+ * @version $Revision: $
+ */
+public class CamelMessageProducer extends ActiveMQMessageProducerSupport {
+    private final CamelDestination destination;
+    private final Endpoint endpoint;
+    protected Producer producer;
+    private boolean closed;
+
+    public CamelMessageProducer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session) throws JMSException {
+        super(session);
+        this.destination = destination;
+        this.endpoint = endpoint;
+        try {
+            this.producer = endpoint.createProducer();
+        }
+        catch (JMSException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw JMSExceptionSupport.create(e);
+        }
+    }
+
+    public CamelDestination getDestination() throws JMSException {
+        return destination;
+    }
+
+    public Endpoint getEndpoint() {
+        return endpoint;
+    }
+
+    public void close() throws JMSException {
+        if (!closed) {
+            closed = true;
+            try {
+                producer.stop();
+            }
+            catch (JMSException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                throw JMSExceptionSupport.create(e);
+            }
+        }
+    }
+
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+    	CamelDestination camelDestination = null;
+        if (ObjectHelper.equals(destination, this.destination)) {
+            camelDestination = this.destination;
+        }
+        else {
+            // TODO support any CamelDestination?
+            throw new IllegalArgumentException("Invalid destination setting: " + destination + " when expected: " + this.destination);
+        }
+        try {
+            JmsExchange exchange = new JmsExchange(endpoint.getContext(), camelDestination.getBinding(), message);
+            producer.process(exchange);
+        }
+        catch (JMSException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw JMSExceptionSupport.create(e);
+        }
+    }
+
+    protected void checkClosed() throws IllegalStateException {
+        if (closed) {
+            throw new IllegalStateException("The producer is closed");
+        }
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelMessageProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueue.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueue.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueue.java Tue Aug  7 11:36:58 2007
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+import javax.jms.QueueReceiver;
+
+/**
+ * A JMS {@link Queue} object which refers to a Camel endpoint
+ *
+ * @version $Revision: $
+ */
+public class CamelQueue extends CamelDestination implements Queue {
+
+    public CamelQueue(String uri) {
+        super(uri);
+    }
+
+    public String getQueueName() throws JMSException {
+        return getUri();
+    }
+
+    public QueueSender createSender(ActiveMQSession session) throws JMSException {
+        return new CamelQueueSender(this, resolveEndpoint(session), session);
+    }
+    public QueueReceiver createReceiver(ActiveMQSession session, String messageSelector) {
+        return new CamelQueueReceiver(this, resolveEndpoint(session), session, messageSelector);
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueReceiver.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueReceiver.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueReceiver.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueReceiver.java Tue Aug  7 11:36:58 2007
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.camel.Endpoint;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+
+/**
+ * A JMS {@link javax.jms.QueueReceiver} which consumes message exchanges from a
+ * Camel {@link org.apache.camel.Endpoint}
+ *
+ * @version $Revision: $
+ */
+public class CamelQueueReceiver extends CamelMessageConsumer implements QueueReceiver {
+
+    public CamelQueueReceiver(CamelQueue destination, Endpoint endpoint, ActiveMQSession session, String name) {
+        super(destination, endpoint, session, null, false);
+    }
+
+    /**
+     * Gets the <CODE>Queue</CODE> associated with this queue receiver.
+     *
+     * @return this receiver's <CODE>Queue</CODE>
+     * @throws JMSException if the JMS provider fails to get the queue for this queue
+     *                      receiver due to some internal error.
+     */
+
+    public Queue getQueue() throws JMSException {
+        checkClosed();
+        return (Queue) super.getDestination();
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueReceiver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueSender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueSender.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueSender.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueSender.java Tue Aug  7 11:36:58 2007
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.camel.Endpoint;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+/**
+ * A JMS {@link javax.jms.QueueSender} which sends message exchanges to a
+ * Camel {@link org.apache.camel.Endpoint}
+ *
+ * @version $Revision: $
+ */
+public class CamelQueueSender extends CamelMessageProducer implements QueueSender {
+
+    public CamelQueueSender(CamelQueue destination, Endpoint endpoint, ActiveMQSession session) throws JMSException {
+        super(destination, endpoint, session);
+    }
+
+
+    /**
+     * Gets the queue associated with this <CODE>QueueSender</CODE>.
+     *
+     * @return this sender's queue
+     * @throws JMSException if the JMS provider fails to get the queue for this
+     *                      <CODE>QueueSender</CODE> due to some internal error.
+     */
+
+    public Queue getQueue() throws JMSException {
+        return (Queue) super.getDestination();
+    }
+
+    /**
+     * Sends a message to a queue for an unidentified message producer. Uses
+     * the <CODE>QueueSender</CODE>'s default delivery mode, priority, and
+     * time to live.
+     * <p/>
+     * <p/>
+     * Typically, a message producer is assigned a queue at creation time;
+     * however, the JMS API also supports unidentified message producers, which
+     * require that the queue be supplied every time a message is sent.
+     *
+     * @param queue   the queue to send this message to
+     * @param message the message to send
+     * @throws JMSException if the JMS provider fails to send the message due to some
+     *                      internal error.
+     * @see javax.jms.MessageProducer#getDeliveryMode()
+     * @see javax.jms.MessageProducer#getTimeToLive()
+     * @see javax.jms.MessageProducer#getPriority()
+     */
+
+    public void send(Queue queue, Message message) throws JMSException {
+        super.send(queue, message);
+    }
+
+    /**
+     * Sends a message to a queue for an unidentified message producer,
+     * specifying delivery mode, priority and time to live.
+     * <p/>
+     * <p/>
+     * Typically, a message producer is assigned a queue at creation time;
+     * however, the JMS API also supports unidentified message producers, which
+     * require that the queue be supplied every time a message is sent.
+     *
+     * @param queue        the queue to send this message to
+     * @param message      the message to send
+     * @param deliveryMode the delivery mode to use
+     * @param priority     the priority for this message
+     * @param timeToLive   the message's lifetime (in milliseconds)
+     * @throws JMSException if the JMS provider fails to send the message due to some
+     *                      internal error.
+     */
+
+    public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive)
+            throws JMSException {
+        super.send(queue,
+                message,
+                deliveryMode,
+                priority,
+                timeToLive);
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelQueueSender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopic.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopic.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopic.java Tue Aug  7 11:36:58 2007
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSubscriber;
+
+/**
+ * A JMS {@link javax.jms.Topic} object which refers to a Camel endpoint
+ *
+ * @version $Revision: $
+ */
+public class CamelTopic extends CamelDestination implements Topic {
+
+    public CamelTopic(String uri) {
+        super(uri);
+    }
+
+    public String getTopicName() throws JMSException {
+        return getUri();
+    }
+
+    public TopicPublisher createPublisher(ActiveMQSession session) throws JMSException {
+        return new CamelTopicPublisher(this, resolveEndpoint(session), session);
+    }
+
+    public TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String messageSelector, boolean noLocal) {
+        return new CamelTopicSubscriber(this, resolveEndpoint(session), session, name, messageSelector, noLocal);
+    }
+
+
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopic.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicPublisher.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicPublisher.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicPublisher.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicPublisher.java Tue Aug  7 11:36:58 2007
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.camel.Endpoint;
+
+import javax.jms.JMSException;
+import javax.jms.TopicPublisher;
+import javax.jms.Topic;
+import javax.jms.Message;
+
+/**
+ * A JMS {@link javax.jms.TopicPublisher} which sends message exchanges to a
+ * Camel {@link Endpoint}
+ *
+ * @version $Revision: $
+ */
+public class CamelTopicPublisher extends CamelMessageProducer implements TopicPublisher {
+
+    public CamelTopicPublisher(CamelTopic destination, Endpoint endpoint, ActiveMQSession session) throws JMSException {
+        super(destination, endpoint, session);
+    }
+
+
+    /**
+     * Gets the topic associated with this <CODE>TopicPublisher</CODE>.
+     *
+     * @return this publisher's topic
+     * @throws JMSException if the JMS provider fails to get the topic for this
+     *                      <CODE>TopicPublisher</CODE> due to some internal error.
+     */
+
+    public Topic getTopic() throws JMSException {
+        return (Topic) super.getDestination();
+    }
+
+    /**
+     * Publishes a message to the topic. Uses the <CODE>TopicPublisher</CODE>'s
+     * default delivery mode, priority, and time to live.
+     *
+     * @param message the message to publish
+     * @throws JMSException                if the JMS provider fails to publish the message due to
+     *                                     some internal error.
+     * @throws javax.jms.MessageFormatException      if an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException if a client uses this method with a <CODE>TopicPublisher
+     *                                     </CODE> with an invalid topic.
+     * @throws java.lang.UnsupportedOperationException
+     *                                     if a client uses this method with a <CODE>TopicPublisher
+     *                                     </CODE> that did not specify a topic at creation time.
+     * @see javax.jms.MessageProducer#getDeliveryMode()
+     * @see javax.jms.MessageProducer#getTimeToLive()
+     * @see javax.jms.MessageProducer#getPriority()
+     */
+
+    public void publish(Message message) throws JMSException {
+        super.send(message);
+    }
+
+    /**
+     * Publishes a message to the topic, specifying delivery mode, priority,
+     * and time to live.
+     *
+     * @param message      the message to publish
+     * @param deliveryMode the delivery mode to use
+     * @param priority     the priority for this message
+     * @param timeToLive   the message's lifetime (in milliseconds)
+     * @throws JMSException                if the JMS provider fails to publish the message due to
+     *                                     some internal error.
+     * @throws javax.jms.MessageFormatException      if an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException if a client uses this method with a <CODE>TopicPublisher
+     *                                     </CODE> with an invalid topic.
+     * @throws java.lang.UnsupportedOperationException
+     *                                     if a client uses this method with a <CODE>TopicPublisher
+     *                                     </CODE> that did not specify a topic at creation time.
+     */
+
+    public void publish(Message message, int deliveryMode, int priority,
+                        long timeToLive) throws JMSException {
+        super.send(message, deliveryMode, priority, timeToLive);
+    }
+
+    /**
+     * Publishes a message to a topic for an unidentified message producer.
+     * Uses the <CODE>TopicPublisher</CODE>'s default delivery mode,
+     * priority, and time to live.
+     * <p/>
+     * <P>
+     * Typically, a message producer is assigned a topic at creation time;
+     * however, the JMS API also supports unidentified message producers, which
+     * require that the topic be supplied every time a message is published.
+     *
+     * @param topic   the topic to publish this message to
+     * @param message the message to publish
+     * @throws JMSException                if the JMS provider fails to publish the message due to
+     *                                     some internal error.
+     * @throws javax.jms.MessageFormatException      if an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException if a client uses this method with an invalid topic.
+     * @see javax.jms.MessageProducer#getDeliveryMode()
+     * @see javax.jms.MessageProducer#getTimeToLive()
+     * @see javax.jms.MessageProducer#getPriority()
+     */
+
+    public void publish(Topic topic, Message message) throws JMSException {
+        super.send(topic, message);
+    }
+
+    /**
+     * Publishes a message to a topic for an unidentified message producer,
+     * specifying delivery mode, priority and time to live.
+     * <p/>
+     * <P>
+     * Typically, a message producer is assigned a topic at creation time;
+     * however, the JMS API also supports unidentified message producers, which
+     * require that the topic be supplied every time a message is published.
+     *
+     * @param topic        the topic to publish this message to
+     * @param message      the message to publish
+     * @param deliveryMode the delivery mode to use
+     * @param priority     the priority for this message
+     * @param timeToLive   the message's lifetime (in milliseconds)
+     * @throws JMSException                if the JMS provider fails to publish the message due to
+     *                                     some internal error.
+     * @throws javax.jms.MessageFormatException      if an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException if a client uses this method with an invalid topic.
+     */
+
+    public void publish(Topic topic, Message message, int deliveryMode,
+                        int priority, long timeToLive) throws JMSException {
+        super.send(topic, message, deliveryMode, priority, timeToLive);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicPublisher.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicSubscriber.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicSubscriber.java?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicSubscriber.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicSubscriber.java Tue Aug  7 11:36:58 2007
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.camel.Endpoint;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+/**
+ * A JMS {@link javax.jms.TopicSubscriber} which consumes message exchanges from a
+ * Camel {@link Endpoint}
+ *
+ * @version $Revision: $
+ */
+public class CamelTopicSubscriber extends CamelMessageConsumer implements TopicSubscriber {
+
+    public CamelTopicSubscriber(CamelTopic destination, Endpoint endpoint, ActiveMQSession session, String name, String messageSelector, boolean noLocal) {
+        super(destination, endpoint, session, messageSelector, noLocal);
+    }
+
+    /**
+     * Gets the <CODE>Topic</CODE> associated with this subscriber.
+     *
+     * @return this subscriber's <CODE>Topic</CODE>
+     * @throws javax.jms.JMSException if the JMS provider fails to get the topic for this topic
+     *                                subscriber due to some internal error.
+     */
+
+    public Topic getTopic() throws JMSException {
+        checkClosed();
+        return (Topic) super.getDestination();
+    }
+
+    /**
+     * Gets the <CODE>NoLocal</CODE> attribute for this subscriber. The
+     * default value for this attribute is false.
+     *
+     * @return true if locally published messages are being inhibited
+     * @throws JMSException if the JMS provider fails to get the <CODE>NoLocal
+     *                      </CODE> attribute for this topic subscriber due to some
+     *                      internal error.
+     */
+
+    public boolean getNoLocal() throws JMSException {
+        checkClosed();
+        return super.isNoLocal();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/CamelTopicSubscriber.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/CamelJmsTest.java (from r563442, activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/jms/CamelJmsTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/CamelJmsTest.java?view=diff&rev=563609&p1=activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/jms/CamelJmsTest.java&r1=563442&p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/CamelJmsTest.java&r2=563609
==============================================================================
--- activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/jms/CamelJmsTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/CamelJmsTest.java Tue Aug  7 11:36:58 2007
@@ -14,8 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.jms;
+package org.apache.activemq.camel;
 
+import junit.framework.Assert;
 import org.apache.camel.CamelTemplate;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spring.SpringTestSupport;
@@ -78,10 +79,10 @@
         template.sendBody("seda:consumer", expectedBody);
 
         Message message = consumer.receive(5000);
-        assertNotNull("Should have received a message from destination: " + destination, message);
+        Assert.assertNotNull("Should have received a message from destination: " + destination, message);
 
         TextMessage textMessage = assertIsInstanceOf(TextMessage.class, message);
-        assertEquals("Message body", expectedBody, textMessage.getText());
+        Assert.assertEquals("Message body", expectedBody, textMessage.getText());
 
         log.info("Received message: " + message);
     }
@@ -91,6 +92,6 @@
     }
 
     protected ClassPathXmlApplicationContext createApplicationContext() {
-        return new ClassPathXmlApplicationContext("org/apache/camel/jms/spring.xml");
+        return new ClassPathXmlApplicationContext("org/apache/activemq/camel/spring.xml");
     }
-}
+}
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/spring.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/spring.xml?view=auto&rev=563609
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/spring.xml (added)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/spring.xml Tue Aug  7 11:36:58 2007
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+       http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
+    ">
+
+  <!-- START SNIPPET: example -->
+  <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
+    <beanPostProcessor/>
+    
+  </camelContext>
+
+  <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
+    <property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
+  </bean>
+
+  <bean id="sendTo" class="org.apache.activemq.camel.CamelDestination">
+    <property name="uri" value="mock:result"/>
+  </bean>
+
+  <bean id="consumeFrom" class="org.apache.activemq.camel.CamelDestination">
+    <property name="uri" value="seda:consumer"/>
+  </bean>
+
+  <bean id="camelTemplate" class="org.apache.camel.spring.CamelTemplateFactoryBean"/>
+  <!-- END SNIPPET: example -->
+
+<!--
+  <bean id="connectionFactory" class="org.apache.camel.jms.CamelConnectionFactory">
+    <property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
+  </bean>
+-->
+
+</beans>

Propchange: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/camel/spring.xml
------------------------------------------------------------------------------
    svn:eol-style = native