You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2014/09/09 17:53:46 UTC

git commit: added CamelRoutesBrokerPlugin for https://issues.apache.org/jira/browse/AMQ-5351

Repository: activemq
Updated Branches:
  refs/heads/trunk b2e6a4166 -> 7ca25965d


added CamelRoutesBrokerPlugin for https://issues.apache.org/jira/browse/AMQ-5351


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7ca25965
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7ca25965
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7ca25965

Branch: refs/heads/trunk
Commit: 7ca25965db5b9ffdfba11b914dafbb36a3504162
Parents: b2e6a41
Author: Rob Davies <ra...@gmail.com>
Authored: Tue Sep 9 16:52:21 2014 +0100
Committer: Rob Davies <ra...@gmail.com>
Committed: Tue Sep 9 16:53:28 2014 +0100

----------------------------------------------------------------------
 .../camel/camelplugin/CamelRoutesBroker.java    | 294 +++++++++++++++++++
 .../camelplugin/CamelRoutesBrokerPlugin.java    |  67 +++++
 .../camelplugin/CamelPluginConfigTest.java      | 127 ++++++++
 .../camel/camelplugin/camel-routes-activemq.xml |  29 ++
 .../activemq/camel/camelplugin/routes.xml       |  22 ++
 activemq-spring/pom.xml                         |   1 +
 6 files changed, 540 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7ca25965/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java
new file mode 100644
index 0000000..4d5bbb2
--- /dev/null
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.camel.camelplugin;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.spring.Utils;
+import org.apache.activemq.usage.Usage;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.RoutesDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.Resource;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * A StatisticsBroker You can retrieve a Map Message for a Destination - or
+ * Broker containing statistics as key-value pairs The message must contain a
+ * replyTo Destination - else its ignored
+ *
+ */
+public class CamelRoutesBroker extends BrokerFilter {
+    private static Logger LOG = LoggerFactory.getLogger(CamelRoutesBroker.class);
+    private String routesFile = "";
+    private int checkPeriod = 1000;
+    private Resource theRoutes;
+    private DefaultCamelContext camelContext;
+    private long lastRoutesModified = -1;
+    private CountDownLatch countDownLatch;
+
+    /**
+     * Overide methods to pause the broker whilst camel routes are loaded
+     */
+    @Override
+    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
+        blockWhileLoadingCamelRoutes();
+        super.send(producerExchange, message);
+    }
+
+    @Override
+    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
+        blockWhileLoadingCamelRoutes();
+        super.acknowledge(consumerExchange, ack);
+    }
+
+    @Override
+    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
+        blockWhileLoadingCamelRoutes();
+        return super.messagePull(context, pull);
+    }
+
+    @Override
+    public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
+        blockWhileLoadingCamelRoutes();
+        super.processConsumerControl(consumerExchange, control);
+    }
+
+    @Override
+    public void reapplyInterceptor() {
+        blockWhileLoadingCamelRoutes();
+        super.reapplyInterceptor();
+    }
+
+    @Override
+    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
+        blockWhileLoadingCamelRoutes();
+        super.beginTransaction(context, xid);
+    }
+
+    @Override
+    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
+        blockWhileLoadingCamelRoutes();
+        return super.prepareTransaction(context, xid);
+    }
+
+    @Override
+    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
+        blockWhileLoadingCamelRoutes();
+        super.rollbackTransaction(context, xid);
+    }
+
+    @Override
+    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
+        blockWhileLoadingCamelRoutes();
+        super.commitTransaction(context, xid, onePhase);
+    }
+
+    @Override
+    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
+        blockWhileLoadingCamelRoutes();
+        super.forgetTransaction(context, transactionId);
+    }
+
+    @Override
+    public void preProcessDispatch(MessageDispatch messageDispatch) {
+        blockWhileLoadingCamelRoutes();
+        super.preProcessDispatch(messageDispatch);
+    }
+
+    @Override
+    public void postProcessDispatch(MessageDispatch messageDispatch) {
+        blockWhileLoadingCamelRoutes();
+        super.postProcessDispatch(messageDispatch);
+    }
+
+    @Override
+    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
+        blockWhileLoadingCamelRoutes();
+        return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
+    }
+
+    @Override
+    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
+        blockWhileLoadingCamelRoutes();
+        super.messageConsumed(context, messageReference);
+    }
+
+    @Override
+    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
+        blockWhileLoadingCamelRoutes();
+        super.messageDelivered(context, messageReference);
+    }
+
+    @Override
+    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
+        blockWhileLoadingCamelRoutes();
+        super.messageDiscarded(context, sub, messageReference);
+    }
+
+    @Override
+    public void isFull(ConnectionContext context, Destination destination, Usage usage) {
+        blockWhileLoadingCamelRoutes();
+        super.isFull(context, destination, usage);
+    }
+
+    @Override
+    public void nowMasterBroker() {
+        blockWhileLoadingCamelRoutes();
+        super.nowMasterBroker();
+    }
+
+    /*
+     * Properties
+     */
+
+    public String getRoutesFile() {
+        return routesFile;
+    }
+
+    public void setRoutesFile(String routesFile) {
+        this.routesFile = routesFile;
+    }
+
+    public int getCheckPeriod() {
+        return checkPeriod;
+    }
+
+    public void setCheckPeriod(int checkPeriod) {
+        this.checkPeriod = checkPeriod;
+    }
+
+    public CamelRoutesBroker(Broker next) {
+        super(next);
+    }
+
+    @Override
+    public void start() throws Exception {
+        super.start();
+        LOG.info("Starting CamelRoutesBroker");
+
+        camelContext = new DefaultCamelContext();
+        camelContext.setName("EmbeddedCamel-" + getBrokerName());
+        camelContext.start();
+
+        getBrokerService().getScheduler().executePeriodically(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    loadCamelRoutes();
+                } catch (Throwable e) {
+                    LOG.error("Failed to load Camel Routes", e);
+                }
+
+            }
+        }, getCheckPeriod());
+    }
+
+
+
+    @Override
+    public void stop() throws Exception {
+        CountDownLatch latch = this.countDownLatch;
+        if (latch != null){
+            latch.countDown();
+        }
+        if (camelContext != null){
+            camelContext.stop();
+        }
+        super.stop();
+    }
+
+    private void loadCamelRoutes() throws Exception{
+        if (theRoutes == null) {
+            String fileToUse = getRoutesFile();
+            if (fileToUse == null || fileToUse.trim().isEmpty()) {
+                BrokerContext brokerContext = getBrokerService().getBrokerContext();
+                if (brokerContext != null) {
+                    String uri = brokerContext.getConfigurationUrl();
+                    Resource resource = Utils.resourceFromString(uri);
+                    if (resource.exists()) {
+                        fileToUse = resource.getFile().getParent();
+                        fileToUse += File.separator;
+                        fileToUse += "routes.xml";
+                    }
+                }
+            }
+            if (fileToUse != null && !fileToUse.isEmpty()){
+                theRoutes = Utils.resourceFromString(fileToUse);
+                setRoutesFile(theRoutes.getFile().getAbsolutePath());
+            }
+        }
+        if (!isStopped() && camelContext != null && theRoutes != null && theRoutes.exists()){
+            long lastModified = theRoutes.lastModified();
+            if (lastModified != lastRoutesModified){
+                CountDownLatch latch = new CountDownLatch(1);
+                this.countDownLatch = latch;
+                lastRoutesModified = lastModified;
+
+                List<RouteDefinition> currentRoutes = camelContext.getRouteDefinitions();
+                for (RouteDefinition rd:currentRoutes){
+                    camelContext.stopRoute(rd);
+                    camelContext.removeRouteDefinition(rd);
+                }
+                InputStream is = theRoutes.getInputStream();
+                RoutesDefinition routesDefinition = camelContext.loadRoutesDefinition(is);
+
+                for (RouteDefinition rd: routesDefinition.getRoutes()){
+                    camelContext.startRoute(rd);
+                }
+                is.close();
+                latch.countDown();
+                this.countDownLatch=null;
+            }
+
+
+        }
+    }
+
+    private void blockWhileLoadingCamelRoutes(){
+        CountDownLatch latch = this.countDownLatch;
+        if (latch != null){
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/7ca25965/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBrokerPlugin.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBrokerPlugin.java b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBrokerPlugin.java
new file mode 100644
index 0000000..2e9833f
--- /dev/null
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBrokerPlugin.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.camel.camelplugin;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A CamelRoutesBrokerPlugin
+ *
+ * load camel routes dynamically from a routes.xml file located in same directory as ActiveMQ.xml
+ *
+ * @org.apache.xbean.XBean element="camelRoutesBrokerPlugin"
+ *
+ */
+public class CamelRoutesBrokerPlugin implements BrokerPlugin {
+    private static Logger LOG = LoggerFactory.getLogger(CamelRoutesBrokerPlugin.class);
+    private String routesFile = "";
+    private int checkPeriod =1000;
+
+    public String getRoutesFile() {
+        return routesFile;
+    }
+
+    public void setRoutesFile(String routesFile) {
+        this.routesFile = routesFile;
+    }
+
+    public int getCheckPeriod() {
+        return checkPeriod;
+    }
+
+    public void setCheckPeriod(int checkPeriod) {
+        this.checkPeriod = checkPeriod;
+    }
+
+    /** 
+     * @param broker
+     * @return the plug-in
+     * @throws Exception
+     * @see org.apache.activemq.broker.BrokerPlugin#installPlugin(org.apache.activemq.broker.Broker)
+     */
+    public Broker installPlugin(Broker broker) throws Exception {
+        CamelRoutesBroker answer = new CamelRoutesBroker(broker);
+        answer.setCheckPeriod(getCheckPeriod());
+        answer.setRoutesFile(getRoutesFile());
+        LOG.info("Installing CamelRoutesBroker");
+        return answer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/7ca25965/activemq-camel/src/test/java/org/apache/activemq/camel/camelplugin/CamelPluginConfigTest.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/camelplugin/CamelPluginConfigTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/camelplugin/CamelPluginConfigTest.java
new file mode 100644
index 0000000..5db3cc2
--- /dev/null
+++ b/activemq-camel/src/test/java/org/apache/activemq/camel/camelplugin/CamelPluginConfigTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.camel.camelplugin;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.core.io.Resource;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class CamelPluginConfigTest {
+
+    protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/camel/camelplugin/";
+    protected static final String TOPIC_NAME = "test.topic";
+    protected static final String QUEUE_NAME = "test.queue";
+
+    protected BrokerService brokerService;
+    protected ActiveMQConnectionFactory factory;
+    protected Connection producerConnection;
+    protected Connection consumerConnection;
+    protected Session consumerSession;
+    protected Session producerSession;
+
+    protected int messageCount = 1000;
+    protected int timeOutInSeconds = 10;
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = createBroker(new FileSystemResource(CONF_ROOT + "camel-routes-activemq.xml"));
+
+        factory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI());
+        consumerConnection = factory.createConnection();
+        consumerConnection.start();
+        producerConnection = factory.createConnection();
+        producerConnection.start();
+        consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    protected BrokerService createBroker(String resource) throws Exception {
+        return createBroker(new ClassPathResource(resource));
+    }
+
+    protected BrokerService createBroker(Resource resource) throws Exception {
+
+        XBeanBrokerFactory factory = new XBeanBrokerFactory();
+        BrokerService broker = factory.createBroker(resource.getURI());
+        return broker;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (producerConnection != null) {
+            producerConnection.close();
+        }
+        if (consumerConnection != null) {
+            consumerConnection.close();
+        }
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    @Test
+    public void testReRouteAll() throws Exception {
+        Thread.sleep(2000);
+        final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME);
+
+        Topic topic = consumerSession.createTopic(TOPIC_NAME);
+
+        final CountDownLatch latch = new CountDownLatch(messageCount);
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(javax.jms.Message message) {
+                try {
+                    latch.countDown();
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        MessageProducer producer = producerSession.createProducer(topic);
+
+        for (int i = 0; i < messageCount; i++) {
+            javax.jms.Message message = producerSession.createTextMessage("test: " + i);
+            producer.send(message);
+        }
+
+        latch.await(timeOutInSeconds, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/7ca25965/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/camel-routes-activemq.xml
----------------------------------------------------------------------
diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/camel-routes-activemq.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/camel-routes-activemq.xml
new file mode 100644
index 0000000..a6be02b
--- /dev/null
+++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/camel-routes-activemq.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<beans
+        xmlns="http://www.springframework.org/schema/beans"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+  <broker xmlns="http://activemq.apache.org/schema/core"  persistent="false">
+    <plugins>
+      <camelRoutesBrokerPlugin checkPeriod="100" />
+    </plugins>
+  </broker>
+</beans>

http://git-wip-us.apache.org/repos/asf/activemq/blob/7ca25965/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/routes.xml
----------------------------------------------------------------------
diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/routes.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/routes.xml
new file mode 100644
index 0000000..d44ae06
--- /dev/null
+++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/routes.xml
@@ -0,0 +1,22 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<routes xmlns="http://camel.apache.org/schema/spring">
+    <route id="test">
+        <from uri="broker:topic:test.topic"/>
+        <to uri="broker:queue:test.queue"/>
+    </route>
+</routes>

http://git-wip-us.apache.org/repos/asf/activemq/blob/7ca25965/activemq-spring/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-spring/pom.xml b/activemq-spring/pom.xml
index afe1a69..6efa019 100755
--- a/activemq-spring/pom.xml
+++ b/activemq-spring/pom.xml
@@ -238,6 +238,7 @@
               <includes>
                 <include>${basedir}/../activemq-client/src/main/java</include>
                 <include>${basedir}/../activemq-broker/src/main/java</include>
+                <include>${basedir}/../activemq-camel/src/main/java</include>
                 <include>${basedir}/../activemq-leveldb-store/src/main/java</include>
                 <include>${basedir}/../activemq-jdbc-store/src/main/java</include>
                 <include>${basedir}/../activemq-kahadb-store/src/main/java</include>