You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/10/12 12:48:09 UTC

[5/5] git commit: CAMEL-6823: Fixed camel-stomp to allow configuring options from endpoint uris.

CAMEL-6823: Fixed camel-stomp to allow configuring options from endpoint uris.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/aeaeb3e5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/aeaeb3e5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/aeaeb3e5

Branch: refs/heads/camel-2.12.x
Commit: aeaeb3e5dfdaf71c67929b4b4622f9b87e672b2e
Parents: 0ecc2f1
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Oct 12 12:47:32 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 12 12:47:54 2013 +0200

----------------------------------------------------------------------
 .../camel/component/stomp/StompComponent.java   |  9 ++-
 .../component/stomp/StompConfiguration.java     | 24 +++----
 .../camel/component/stomp/StompEndpoint.java    | 12 ++--
 .../camel/component/stomp/StompBaseTest.java    |  6 +-
 .../component/stomp/StompConsumerUriTest.java   | 69 ++++++++++++++++++++
 5 files changed, 100 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/aeaeb3e5/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
index 165c24e..a7ac94b 100644
--- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
+++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
@@ -23,13 +23,18 @@ import org.apache.camel.impl.DefaultComponent;
 
 public class StompComponent extends DefaultComponent {
 
-    StompConfiguration configuration = new StompConfiguration();
+    private StompConfiguration configuration = new StompConfiguration();
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         String destination = "/" + remaining.replaceAll(":", "/");
 
-        StompEndpoint endpoint = new StompEndpoint(uri, this, getConfiguration(), destination);
+        // must copy config so we do not have side effects
+        StompConfiguration config = getConfiguration().copy();
+        // allow to configure configuration from uri parameters
+        setProperties(config, parameters);
+
+        StompEndpoint endpoint = new StompEndpoint(uri, this, config, destination);
         setProperties(endpoint, parameters);
         return endpoint;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/aeaeb3e5/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
index 664215b..091c2f7 100644
--- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
+++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
@@ -16,15 +16,25 @@
  */
 package org.apache.camel.component.stomp;
 
-import org.fusesource.stomp.client.Stomp;
+import org.apache.camel.RuntimeCamelException;
 
-public class StompConfiguration {
+public class StompConfiguration implements Cloneable {
 
     private String brokerURL = "tcp://localhost:61613";
     private String login;
     private String passcode;
 
-    private Stomp stomp;
+    /**
+     * Returns a copy of this configuration
+     */
+    public StompConfiguration copy() {
+        try {
+            StompConfiguration copy = (StompConfiguration) clone();
+            return copy;
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
 
     public String getBrokerURL() {
         return brokerURL;
@@ -50,12 +60,4 @@ public class StompConfiguration {
         this.passcode = passcode;
     }
 
-    public Stomp getStomp() throws Exception {
-        if (stomp == null) {
-            stomp = new Stomp(brokerURL);
-            stomp.setLogin(login);
-            stomp.setPasscode(passcode);
-        }
-        return stomp;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/aeaeb3e5/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
index 623dea0..d2fbb0e 100644
--- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
+++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
@@ -30,6 +30,7 @@ import org.fusesource.hawtdispatch.Task;
 import org.fusesource.stomp.client.Callback;
 import org.fusesource.stomp.client.CallbackConnection;
 import org.fusesource.stomp.client.Promise;
+import org.fusesource.stomp.client.Stomp;
 import org.fusesource.stomp.codec.StompFrame;
 
 import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
@@ -45,6 +46,7 @@ public class StompEndpoint extends DefaultEndpoint {
     private CallbackConnection connection;
     private StompConfiguration configuration;
     private String destination;
+    private Stomp stomp;
 
     private final List<StompConsumer> consumers = new CopyOnWriteArrayList<StompConsumer>();
 
@@ -70,7 +72,10 @@ public class StompEndpoint extends DefaultEndpoint {
     protected void doStart() throws Exception {
         final Promise<CallbackConnection> promise = new Promise<CallbackConnection>();
 
-        configuration.getStomp().connectCallback(promise);
+        stomp = new Stomp(configuration.getBrokerURL());
+        stomp.setLogin(configuration.getLogin());
+        stomp.setPasscode(configuration.getPasscode());
+        stomp.connectCallback(promise);
 
         connection = promise.await();
 
@@ -125,11 +130,6 @@ public class StompEndpoint extends DefaultEndpoint {
         });
     }
 
-    @Override
-    protected String createEndpointUri() {
-        return super.createEndpointUri();
-    }
-
     void addConsumer(final StompConsumer consumer) {
         connection.getDispatchQueue().execute(new Task() {
             @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/aeaeb3e5/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
index 052ab75..3193c86 100644
--- a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
+++ b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
@@ -24,12 +24,16 @@ public abstract class StompBaseTest extends CamelTestSupport {
     protected BrokerService brokerService;
     protected int numberOfMessages = 100;
 
+    protected int getPort() {
+        return 61613;
+    }
+
     @Override
     public void setUp() throws Exception {
         brokerService = new BrokerService();
         brokerService.setPersistent(false);
         brokerService.setAdvisorySupport(false);
-        brokerService.addConnector("stomp://localhost:61613?trace=true");
+        brokerService.addConnector("stomp://localhost:" + getPort() + "?trace=true");
         brokerService.start();
         brokerService.waitUntilStarted();
         super.setUp();

http://git-wip-us.apache.org/repos/asf/camel/blob/aeaeb3e5/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerUriTest.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerUriTest.java b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerUriTest.java
new file mode 100644
index 0000000..a87e943
--- /dev/null
+++ b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerUriTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.fusesource.stomp.client.BlockingConnection;
+import org.fusesource.stomp.client.Stomp;
+import org.fusesource.stomp.codec.StompFrame;
+import org.junit.Test;
+
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+import static org.fusesource.stomp.client.Constants.DESTINATION;
+import static org.fusesource.stomp.client.Constants.MESSAGE_ID;
+import static org.fusesource.stomp.client.Constants.SEND;
+
+public class StompConsumerUriTest extends StompBaseTest {
+
+    @Override
+    protected int getPort() {
+        return 61614;
+    }
+
+    @Test
+    public void testConsume() throws Exception {
+        Stomp stomp = new Stomp("tcp://localhost:" + getPort());
+        final BlockingConnection producerConnection = stomp.connectBlocking();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(numberOfMessages);
+
+        for (int i = 0; i < numberOfMessages; i++) {
+            StompFrame frame = new StompFrame(SEND);
+            frame.addHeader(DESTINATION, StompFrame.encodeHeader("/queue/test"));
+            frame.addHeader(MESSAGE_ID, StompFrame.encodeHeader("msg:" + i));
+            frame.content(utf8("Important Message " + i));
+            producerConnection.send(frame);
+        }
+
+        mock.await(5, TimeUnit.SECONDS);
+        mock.assertIsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("stomp:queue:test?brokerURL=tcp://localhost:61614")
+                        .transform(body().convertToString())
+                        .to("mock:result");
+            }
+        };
+    }
+}