You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/31 13:35:35 UTC

[1/3] git commit: CAMEL-6165: New camel-stomp component. Thanks to Dejan Bosanac for the contribution.

Updated Branches:
  refs/heads/master e7db4ab26 -> 64675d495


CAMEL-6165: New camel-stomp component. Thanks to Dejan Bosanac for the contribution.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2beac53a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2beac53a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2beac53a

Branch: refs/heads/master
Commit: 2beac53a41575cffce21967dd209154de6868a33
Parents: d936737
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 31 11:09:33 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 31 13:17:34 2013 +0200

----------------------------------------------------------------------
 components/camel-stomp/README.md                |  20 +++
 components/camel-stomp/pom.xml                  |  89 ++++++++++
 .../camel/component/stomp/StompComponent.java   |  56 +++++++
 .../component/stomp/StompConfiguration.java     |  61 +++++++
 .../camel/component/stomp/StompConsumer.java    |  60 +++++++
 .../camel/component/stomp/StompEndpoint.java    | 162 +++++++++++++++++++
 .../camel/component/stomp/StompProducer.java    |  43 +++++
 .../services/org/apache/camel/component/stomp   |  18 +++
 .../camel/component/stomp/StompBaseTest.java    |  46 ++++++
 .../component/stomp/StompConsumerTest.java      |  64 ++++++++
 .../component/stomp/StompProducerTest.java      |  83 ++++++++++
 .../src/test/resources/log4j.properties         |  40 +++++
 parent/pom.xml                                  |   1 +
 13 files changed, 743 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/README.md
----------------------------------------------------------------------
diff --git a/components/camel-stomp/README.md b/components/camel-stomp/README.md
new file mode 100644
index 0000000..8341d09
--- /dev/null
+++ b/components/camel-stomp/README.md
@@ -0,0 +1,20 @@
+camel-stomp
+===========
+
+Camel component used for communicating with [Stomp] (http://stomp.github.io/) compliant message brokers, like [Apache ActiveMQ](http://activemq.apache.org) or [ActiveMQ Apollo](http://activemq.apache.org/apollo/).
+
+URI format
+----------
+
+    stomp:destination
+
+Where destination is broker specific. With ActiveMQ you can use queues and topics in the form of
+
+    stomp:queue:test
+
+Samples
+-------
+
+    from("direct:foo").to("stomp:queue:test")
+
+

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-stomp/pom.xml b/components/camel-stomp/pom.xml
new file mode 100644
index 0000000..c81fd89
--- /dev/null
+++ b/components/camel-stomp/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components</artifactId>
+    <version>2.12-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>camel-stomp</artifactId>
+  <packaging>bundle</packaging>
+  <name>Camel :: Stomp</name>
+  <description>Camel Stomp client</description>
+
+  <properties>
+    <camel.osgi.export.pkg>org.apache.camel.component.stomp.*</camel.osgi.export.pkg>
+    <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=stomp</camel.osgi.export.service>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.stompjms</groupId>
+      <artifactId>stompjms-client</artifactId>
+      <version>${stompjms-version}</version>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <version>${activemq-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-stomp</artifactId>
+      <version>${activemq-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.thoughtworks.xstream</groupId>
+      <artifactId>xstream</artifactId>
+      <version>${xstream-version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
new file mode 100644
index 0000000..165c24e
--- /dev/null
+++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+public class StompComponent extends DefaultComponent {
+
+    StompConfiguration configuration = new StompConfiguration();
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        String destination = "/" + remaining.replaceAll(":", "/");
+
+        StompEndpoint endpoint = new StompEndpoint(uri, this, getConfiguration(), destination);
+        setProperties(endpoint, parameters);
+        return endpoint;
+    }
+
+    public StompConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(StompConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public void setBrokerURL(String brokerURL) {
+        getConfiguration().setBrokerURL(brokerURL);
+    }
+
+    public void setLogin(String login) {
+        getConfiguration().setLogin(login);
+    }
+
+    public void setPasscode(String passcode) {
+        getConfiguration().setPasscode(passcode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
new file mode 100644
index 0000000..664215b
--- /dev/null
+++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import org.fusesource.stomp.client.Stomp;
+
+public class StompConfiguration {
+
+    private String brokerURL = "tcp://localhost:61613";
+    private String login;
+    private String passcode;
+
+    private Stomp stomp;
+
+    public String getBrokerURL() {
+        return brokerURL;
+    }
+
+    public void setBrokerURL(String brokerURL) {
+        this.brokerURL = brokerURL;
+    }
+
+    public String getLogin() {
+        return login;
+    }
+
+    public void setLogin(String login) {
+        this.login = login;
+    }
+
+    public String getPasscode() {
+        return passcode;
+    }
+
+    public void setPasscode(String passcode) {
+        this.passcode = passcode;
+    }
+
+    public Stomp getStomp() throws Exception {
+        if (stomp == null) {
+            stomp = new Stomp(brokerURL);
+            stomp.setLogin(login);
+            stomp.setPasscode(passcode);
+        }
+        return stomp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java
new file mode 100644
index 0000000..a405e40
--- /dev/null
+++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.fusesource.hawtbuf.AsciiBuffer;
+
+public class StompConsumer extends DefaultConsumer {
+
+    AsciiBuffer id;
+
+    public StompConsumer(Endpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        id = getEndpoint().getNextId();
+    }
+
+    @Override
+    public StompEndpoint getEndpoint() {
+        return (StompEndpoint) super.getEndpoint();
+    }
+
+    protected void doStart() throws Exception {
+        getEndpoint().addConsumer(this);
+        super.doStart();
+    }
+
+    protected void doStop() throws Exception {
+        getEndpoint().removeConsumer(this);
+        super.doStop();
+    }
+
+    void processExchange(Exchange exchange) {
+        try {
+            getProcessor().process(exchange);
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
+
+        if (exchange.getException() != null) {
+            getExceptionHandler().handleException("Error processing exchange.", exchange, exchange.getException());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
new file mode 100644
index 0000000..623dea0
--- /dev/null
+++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtdispatch.Task;
+import org.fusesource.stomp.client.Callback;
+import org.fusesource.stomp.client.CallbackConnection;
+import org.fusesource.stomp.client.Promise;
+import org.fusesource.stomp.codec.StompFrame;
+
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+import static org.fusesource.stomp.client.Constants.DESTINATION;
+import static org.fusesource.stomp.client.Constants.DISCONNECT;
+import static org.fusesource.stomp.client.Constants.ID;
+import static org.fusesource.stomp.client.Constants.SEND;
+import static org.fusesource.stomp.client.Constants.SUBSCRIBE;
+import static org.fusesource.stomp.client.Constants.UNSUBSCRIBE;
+
+public class StompEndpoint extends DefaultEndpoint {
+
+    private CallbackConnection connection;
+    private StompConfiguration configuration;
+    private String destination;
+
+    private final List<StompConsumer> consumers = new CopyOnWriteArrayList<StompConsumer>();
+
+    public StompEndpoint(String uri, StompComponent component, StompConfiguration configuration, String destination) {
+        super(uri, component);
+        this.configuration = configuration;
+        this.destination = destination;
+    }
+
+    public Producer createProducer() throws Exception {
+        return new StompProducer(this);
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new StompConsumer(this, processor);
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        final Promise<CallbackConnection> promise = new Promise<CallbackConnection>();
+
+        configuration.getStomp().connectCallback(promise);
+
+        connection = promise.await();
+
+        connection.getDispatchQueue().execute(new Task() {
+            @Override
+            public void run() {
+                connection.receive(new Callback<StompFrame>() {
+                    @Override
+                    public void onFailure(Throwable value) {
+                        if (started.get()) {
+                            connection.close(null);
+                        }
+                    }
+
+                    @Override
+                    public void onSuccess(StompFrame value) {
+                        if (!consumers.isEmpty()) {
+                            Exchange exchange = createExchange();
+                            exchange.getIn().setBody(value.content());
+                            for (StompConsumer consumer : consumers) {
+                                consumer.processExchange(exchange);
+                            }
+                        }
+                    }
+                });
+                connection.resume();
+            }
+        });
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        connection.getDispatchQueue().execute(new Task() {
+            @Override
+            public void run() {
+                StompFrame frame = new StompFrame(DISCONNECT);
+                connection.send(frame, null);
+            }
+        });
+        connection.close(null);
+    }
+
+    protected void send(Message message) {
+        final StompFrame frame = new StompFrame(SEND);
+        frame.addHeader(DESTINATION, StompFrame.encodeHeader(destination));
+        frame.content(utf8(message.getBody().toString()));
+        connection.getDispatchQueue().execute(new Task() {
+            @Override
+            public void run() {
+                connection.send(frame, null);
+            }
+        });
+    }
+
+    @Override
+    protected String createEndpointUri() {
+        return super.createEndpointUri();
+    }
+
+    void addConsumer(final StompConsumer consumer) {
+        connection.getDispatchQueue().execute(new Task() {
+            @Override
+            public void run() {
+                StompFrame frame = new StompFrame(SUBSCRIBE);
+                frame.addHeader(DESTINATION, StompFrame.encodeHeader(destination));
+                frame.addHeader(ID, consumer.id);
+                connection.send(frame, null);
+            }
+        });
+        consumers.add(consumer);
+    }
+
+    void removeConsumer(final StompConsumer consumer) {
+        connection.getDispatchQueue().execute(new Task() {
+            @Override
+            public void run() {
+                StompFrame frame = new StompFrame(UNSUBSCRIBE);
+                frame.addHeader(DESTINATION, StompFrame.encodeHeader(destination));
+                frame.addHeader(ID, consumer.id);
+                connection.send(frame, null);
+            }
+        });
+        consumers.remove(consumer);
+    }
+
+    AsciiBuffer getNextId() {
+        return connection.nextId();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java
new file mode 100644
index 0000000..74b8b63
--- /dev/null
+++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultAsyncProducer;
+
+public class StompProducer extends DefaultAsyncProducer implements Processor {
+
+    private final StompEndpoint stompEndpoint;
+
+    public StompProducer(StompEndpoint stompEndpoint) {
+        super(stompEndpoint);
+        this.stompEndpoint = stompEndpoint;
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            stompEndpoint.send(exchange.getIn());
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+        callback.done(true);
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/resources/META-INF/services/org/apache/camel/component/stomp
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/main/resources/META-INF/services/org/apache/camel/component/stomp b/components/camel-stomp/src/main/resources/META-INF/services/org/apache/camel/component/stomp
new file mode 100644
index 0000000..ae04da8
--- /dev/null
+++ b/components/camel-stomp/src/main/resources/META-INF/services/org/apache/camel/component/stomp
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.stomp.StompComponent
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
new file mode 100644
index 0000000..052ab75
--- /dev/null
+++ b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+public abstract class StompBaseTest extends CamelTestSupport {
+
+    protected BrokerService brokerService;
+    protected int numberOfMessages = 100;
+
+    @Override
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.addConnector("stomp://localhost:61613?trace=true");
+        brokerService.start();
+        brokerService.waitUntilStarted();
+        super.setUp();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerTest.java b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerTest.java
new file mode 100644
index 0000000..a2def8c
--- /dev/null
+++ b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.fusesource.stomp.client.BlockingConnection;
+import org.fusesource.stomp.client.Stomp;
+import org.fusesource.stomp.codec.StompFrame;
+import org.junit.Test;
+
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+import static org.fusesource.stomp.client.Constants.DESTINATION;
+import static org.fusesource.stomp.client.Constants.MESSAGE_ID;
+import static org.fusesource.stomp.client.Constants.SEND;
+
+public class StompConsumerTest extends StompBaseTest {
+
+    @Test
+    public void testConsume() throws Exception {
+        Stomp stomp = new Stomp("tcp://localhost:61613");
+        final BlockingConnection producerConnection = stomp.connectBlocking();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(numberOfMessages);
+
+        for (int i = 0; i < numberOfMessages; i++) {
+            StompFrame frame = new StompFrame(SEND);
+            frame.addHeader(DESTINATION, StompFrame.encodeHeader("/queue/test"));
+            frame.addHeader(MESSAGE_ID, StompFrame.encodeHeader("msg:" + i));
+            frame.content(utf8("Important Message " + i));
+            producerConnection.send(frame);
+        }
+
+        mock.await(5, TimeUnit.SECONDS);
+        mock.assertIsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("stomp:queue:test")
+                        .transform(body().convertToString())
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompProducerTest.java b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompProducerTest.java
new file mode 100644
index 0000000..fc878ea
--- /dev/null
+++ b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompProducerTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.fusesource.stomp.client.BlockingConnection;
+import org.fusesource.stomp.client.Stomp;
+import org.fusesource.stomp.codec.StompFrame;
+import org.junit.Test;
+
+import static org.fusesource.stomp.client.Constants.DESTINATION;
+import static org.fusesource.stomp.client.Constants.ID;
+import static org.fusesource.stomp.client.Constants.SUBSCRIBE;
+
+public class StompProducerTest extends StompBaseTest {
+
+    @Test
+    public void testProduce() throws Exception {
+        Stomp stomp = new Stomp("tcp://localhost:61613");
+        final BlockingConnection subscribeConnection = stomp.connectBlocking();
+
+        StompFrame frame = new StompFrame(SUBSCRIBE);
+        frame.addHeader(DESTINATION, StompFrame.encodeHeader("/queue/test"));
+        frame.addHeader(ID, subscribeConnection.nextId());
+        StompFrame response = subscribeConnection.request(frame);
+
+        final CountDownLatch latch = new CountDownLatch(numberOfMessages);
+
+        Thread thread = new Thread(new Runnable() {
+            public void run() {
+                for (int i = 0; i < numberOfMessages; i++) {
+                    try {
+                        StompFrame frame = subscribeConnection.receive();
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        break;
+                    }
+                }
+            }
+        });
+        thread.start();
+
+        Producer producer = context.getEndpoint("direct:foo").createProducer();
+        for (int i = 0; i < numberOfMessages; i++) {
+            Exchange exchange = producer.createExchange();
+            exchange.getIn().setBody("test message " + i);
+            producer.process(exchange);
+        }
+        latch.await(20, TimeUnit.SECONDS);
+        assertTrue("Messages not consumed = " + latch.getCount(), latch.getCount() == 0);
+
+
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:foo").to("stomp:queue:test");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/test/resources/log4j.properties b/components/camel-stomp/src/test/resources/log4j.properties
new file mode 100644
index 0000000..55f9354
--- /dev/null
+++ b/components/camel-stomp/src/test/resources/log4j.properties
@@ -0,0 +1,40 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=info, file
+
+#log4j.logger.twitter4j=DEBUG
+#log4j.logger.org.apache.camel.component.stomp=DEBUG
+#log4j.logger.org.apache.camel=DEBUG
+
+#log4j.logger.org.apache.activemq.transport.stomp=TRACE, out
+#log4j.additivity.org.apache.activemq.transport.stomp=false
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.file.file=target/camel-stomp-test.log
+log4j.appender.file.append=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index cba4ea5..ce8e05b 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -367,6 +367,7 @@
     <spymemcached-bundle-version>2.5_2</spymemcached-bundle-version> <!-- FIXME cmueller: not in sync! -->
     <spymemcached-version>2.8.4</spymemcached-version>
     <sshd-version>0.8.0</sshd-version>
+    <stompjms-version>1.13</stompjms-version>
     <stax-api-version>1.0.1</stax-api-version>
     <stax2-api-bundle-version>3.1.1</stax2-api-bundle-version>
     <stringtemplate-bundle-version>4.0.2_2</stringtemplate-bundle-version>


[3/3] git commit: CAMEL-6165: New camel-stomp component. Thanks to Dejan Bosanac for the contribution.

Posted by da...@apache.org.
CAMEL-6165: New camel-stomp component. Thanks to Dejan Bosanac for the contribution.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/64675d49
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/64675d49
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/64675d49

Branch: refs/heads/master
Commit: 64675d495be6abbf4831e29abb2cd33a5d58b107
Parents: 2beac53
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 31 13:17:13 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 31 13:35:21 2013 +0200

----------------------------------------------------------------------
 apache-camel/pom.xml                                   |  4 ++++
 apache-camel/src/main/descriptors/common-bin.xml       |  1 +
 components/pom.xml                                     |  1 +
 parent/pom.xml                                         |  5 +++++
 .../karaf/features/src/main/resources/features.xml     | 13 +++++++++++++
 5 files changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/64675d49/apache-camel/pom.xml
----------------------------------------------------------------------
diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml
index 6108f62..0680fab 100644
--- a/apache-camel/pom.xml
+++ b/apache-camel/pom.xml
@@ -556,6 +556,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
+      <artifactId>camel-stomp</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
       <artifactId>camel-stringtemplate</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/64675d49/apache-camel/src/main/descriptors/common-bin.xml
----------------------------------------------------------------------
diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml
index 6cfde8d..db31116 100644
--- a/apache-camel/src/main/descriptors/common-bin.xml
+++ b/apache-camel/src/main/descriptors/common-bin.xml
@@ -148,6 +148,7 @@
         <include>org.apache.camel:camel-sql</include>
         <include>org.apache.camel:camel-ssh</include>
         <include>org.apache.camel:camel-stax</include>
+        <include>org.apache.camel:camel-stomp</include>
         <include>org.apache.camel:camel-stream</include>
         <include>org.apache.camel:camel-stringtemplate</include>
         <include>org.apache.camel:camel-syslog</include>

http://git-wip-us.apache.org/repos/asf/camel/blob/64675d49/components/pom.xml
----------------------------------------------------------------------
diff --git a/components/pom.xml b/components/pom.xml
index 3f5713d..f52e83f 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -167,6 +167,7 @@
     <module>camel-sql</module>
     <module>camel-ssh</module>
     <module>camel-stax</module>
+    <module>camel-stomp</module>
     <module>camel-stream</module>
     <module>camel-stringtemplate</module>
     <module>camel-syslog</module>

http://git-wip-us.apache.org/repos/asf/camel/blob/64675d49/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index ce8e05b..dce08e5 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -1113,6 +1113,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-stomp</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-stringtemplate</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/64675d49/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index a42c69f..864aeef 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -95,6 +95,8 @@
     <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.avro/${avro-bundle-version}</bundle>
     <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.avro-ipc/${avro-bundle-version}</bundle>
     <bundle dependency='true'>mvn:org.apache.commons/commons-compress/${commons-compress-version}</bundle>
+    <bundle dependency='true'>mvn:commons-lang/commons-lang/${commons-lang-version}</bundle>
+    <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.jetty/${jetty6-bundle-version}</bundle>
     <bundle>mvn:org.apache.camel/camel-avro/${project.version}</bundle>
   </feature>
   <feature name='camel-aws' version='${project.version}' resolver='(obr)' start-level='50'>
@@ -959,6 +961,17 @@
     <feature version='${project.version}'>camel-core</feature>
     <bundle>mvn:org.apache.camel/camel-stream/${project.version}</bundle>
   </feature>
+<!-- TODO: stompjms need a new release to fix bad OSGi MANIFEST.MF data  
+  <feature name='camel-stomp' version='${project.version}' resolver='(obr)' start-level='50'>
+    <feature version='${project.version}'>camel-core</feature>
+    <bundle dependency='true'>mvn:org.apache.geronimo.specs/geronimo-jms_1.1_spec/${geronimo-jms-spec-version}</bundle>
+    <bundle dependency='true'>mvn:org.fusesource.stompjms/stompjms-client/${stompjms-version}</bundle>
+    <bundle dependency='true'>mvn:org.fusesource.hawtbuf/hawtbuf/${hawtbuf-version}</bundle>
+    <bundle dependency='true'>mvn:org.fusesource.hawtdispatch/hawtdispatch/${hawtdispatch-version}</bundle>
+    <bundle dependency='true'>mvn:org.fusesource.hawtdispatch/hawtdispatch-transport/${hawtdispatch-version}</bundle>
+    <bundle>mvn:org.apache.camel/camel-stomp/${project.version}</bundle>
+  </feature>
+-->
   <feature name='camel-string-template' version='${project.version}' resolver='(obr)' start-level='50'>
     <feature version='${project.version}'>camel-core</feature>
     <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.antlr/${antlr-bundle-version}</bundle>


[2/3] git commit: Upgraded AHC

Posted by da...@apache.org.
Upgraded AHC


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d9367377
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d9367377
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d9367377

Branch: refs/heads/master
Commit: d93673774c13654c5be68a88acfb3ecedac9b943
Parents: e7db4ab
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 31 10:13:42 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 31 13:17:34 2013 +0200

----------------------------------------------------------------------
 parent/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d9367377/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 78a48b4..cba4ea5 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -37,7 +37,7 @@
     <abdera-version>1.1.3</abdera-version>
     <!-- Note that activemq dependency is only used for testing! -->
     <activemq-version>5.8.0</activemq-version>
-    <ahc-version>1.7.13</ahc-version>
+    <ahc-version>1.7.19</ahc-version>
     <ant-bundle-version>1.7.0_6</ant-bundle-version>
     <antlr-bundle-version>3.4_1</antlr-bundle-version>
     <antlr-runtime-bundle-version>3.4_2</antlr-runtime-bundle-version>