You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/10/12 12:48:08 UTC
[4/5] git commit: CAMEL-6823: Fixed camel-stomp to allow configuring
options from endpoint uris.
CAMEL-6823: Fixed camel-stomp to allow configuring options from endpoint uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6dc0d840
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6dc0d840
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6dc0d840
Branch: refs/heads/master
Commit: 6dc0d840d4d560464b3174fd1cd03836f6cb78c8
Parents: 1c73643
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Oct 12 12:47:32 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 12 12:47:32 2013 +0200
----------------------------------------------------------------------
.../camel/component/stomp/StompComponent.java | 9 ++-
.../component/stomp/StompConfiguration.java | 24 +++----
.../camel/component/stomp/StompEndpoint.java | 12 ++--
.../camel/component/stomp/StompBaseTest.java | 6 +-
.../component/stomp/StompConsumerUriTest.java | 69 ++++++++++++++++++++
5 files changed, 100 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6dc0d840/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
index 165c24e..a7ac94b 100644
--- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
+++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
@@ -23,13 +23,18 @@ import org.apache.camel.impl.DefaultComponent;
public class StompComponent extends DefaultComponent {
- StompConfiguration configuration = new StompConfiguration();
+ private StompConfiguration configuration = new StompConfiguration();
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
String destination = "/" + remaining.replaceAll(":", "/");
- StompEndpoint endpoint = new StompEndpoint(uri, this, getConfiguration(), destination);
+ // must copy config so we do not have side effects
+ StompConfiguration config = getConfiguration().copy();
+ // allow to configure configuration from uri parameters
+ setProperties(config, parameters);
+
+ StompEndpoint endpoint = new StompEndpoint(uri, this, config, destination);
setProperties(endpoint, parameters);
return endpoint;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6dc0d840/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
index 664215b..091c2f7 100644
--- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
+++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
@@ -16,15 +16,25 @@
*/
package org.apache.camel.component.stomp;
-import org.fusesource.stomp.client.Stomp;
+import org.apache.camel.RuntimeCamelException;
-public class StompConfiguration {
+public class StompConfiguration implements Cloneable {
private String brokerURL = "tcp://localhost:61613";
private String login;
private String passcode;
- private Stomp stomp;
+ /**
+ * Returns a copy of this configuration
+ */
+ public StompConfiguration copy() {
+ try {
+ StompConfiguration copy = (StompConfiguration) clone();
+ return copy;
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
public String getBrokerURL() {
return brokerURL;
@@ -50,12 +60,4 @@ public class StompConfiguration {
this.passcode = passcode;
}
- public Stomp getStomp() throws Exception {
- if (stomp == null) {
- stomp = new Stomp(brokerURL);
- stomp.setLogin(login);
- stomp.setPasscode(passcode);
- }
- return stomp;
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6dc0d840/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
index 623dea0..d2fbb0e 100644
--- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
+++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
@@ -30,6 +30,7 @@ import org.fusesource.hawtdispatch.Task;
import org.fusesource.stomp.client.Callback;
import org.fusesource.stomp.client.CallbackConnection;
import org.fusesource.stomp.client.Promise;
+import org.fusesource.stomp.client.Stomp;
import org.fusesource.stomp.codec.StompFrame;
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
@@ -45,6 +46,7 @@ public class StompEndpoint extends DefaultEndpoint {
private CallbackConnection connection;
private StompConfiguration configuration;
private String destination;
+ private Stomp stomp;
private final List<StompConsumer> consumers = new CopyOnWriteArrayList<StompConsumer>();
@@ -70,7 +72,10 @@ public class StompEndpoint extends DefaultEndpoint {
protected void doStart() throws Exception {
final Promise<CallbackConnection> promise = new Promise<CallbackConnection>();
- configuration.getStomp().connectCallback(promise);
+ stomp = new Stomp(configuration.getBrokerURL());
+ stomp.setLogin(configuration.getLogin());
+ stomp.setPasscode(configuration.getPasscode());
+ stomp.connectCallback(promise);
connection = promise.await();
@@ -125,11 +130,6 @@ public class StompEndpoint extends DefaultEndpoint {
});
}
- @Override
- protected String createEndpointUri() {
- return super.createEndpointUri();
- }
-
void addConsumer(final StompConsumer consumer) {
connection.getDispatchQueue().execute(new Task() {
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/6dc0d840/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
index 052ab75..3193c86 100644
--- a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
+++ b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
@@ -24,12 +24,16 @@ public abstract class StompBaseTest extends CamelTestSupport {
protected BrokerService brokerService;
protected int numberOfMessages = 100;
+ protected int getPort() {
+ return 61613;
+ }
+
@Override
public void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
- brokerService.addConnector("stomp://localhost:61613?trace=true");
+ brokerService.addConnector("stomp://localhost:" + getPort() + "?trace=true");
brokerService.start();
brokerService.waitUntilStarted();
super.setUp();
http://git-wip-us.apache.org/repos/asf/camel/blob/6dc0d840/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerUriTest.java
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerUriTest.java b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerUriTest.java
new file mode 100644
index 0000000..a87e943
--- /dev/null
+++ b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerUriTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.fusesource.stomp.client.BlockingConnection;
+import org.fusesource.stomp.client.Stomp;
+import org.fusesource.stomp.codec.StompFrame;
+import org.junit.Test;
+
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+import static org.fusesource.stomp.client.Constants.DESTINATION;
+import static org.fusesource.stomp.client.Constants.MESSAGE_ID;
+import static org.fusesource.stomp.client.Constants.SEND;
+
+public class StompConsumerUriTest extends StompBaseTest {
+
+ @Override
+ protected int getPort() {
+ return 61614;
+ }
+
+ @Test
+ public void testConsume() throws Exception {
+ Stomp stomp = new Stomp("tcp://localhost:" + getPort());
+ final BlockingConnection producerConnection = stomp.connectBlocking();
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(numberOfMessages);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ StompFrame frame = new StompFrame(SEND);
+ frame.addHeader(DESTINATION, StompFrame.encodeHeader("/queue/test"));
+ frame.addHeader(MESSAGE_ID, StompFrame.encodeHeader("msg:" + i));
+ frame.content(utf8("Important Message " + i));
+ producerConnection.send(frame);
+ }
+
+ mock.await(5, TimeUnit.SECONDS);
+ mock.assertIsSatisfied();
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("stomp:queue:test?brokerURL=tcp://localhost:61614")
+ .transform(body().convertToString())
+ .to("mock:result");
+ }
+ };
+ }
+}