You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/12/19 07:16:06 UTC

[camel] branch master updated (44dacd6 -> 2e65514)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 44dacd6  Destroying some secret keys after they're used
     new 9fcaebe  CAMEL-13014 fix stealing link for clientId (looped error)
     new c599e5f  CAMEL-13014, fix MQTTConfigurationTest junit test
     new 2e65514  CAMEL-13014 - Fixed CS

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/camel/component/mqtt/MQTTEndpoint.java     |  1 +
 .../camel/component/mqtt/MQTTConfigurationTest.java       |  6 +++---
 .../{MQTTProducerTest.java => MQTTLoopProducerTest.java}  | 15 +++++++++------
 3 files changed, 13 insertions(+), 9 deletions(-)
 copy components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/{MQTTProducerTest.java => MQTTLoopProducerTest.java} (81%)


[camel] 01/03: CAMEL-13014 fix stealing link for clientId (looped error)

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9fcaebee3ce95cc583a6b4680b00fb6013dbbe3b
Author: Fabrizio Spataro <fa...@bizmate.it>
AuthorDate: Tue Dec 18 16:44:32 2018 +0100

    CAMEL-13014 fix stealing link for clientId (looped error)
---
 .../apache/camel/component/mqtt/MQTTEndpoint.java  |  1 +
 .../camel/component/mqtt/MQTTLoopProducerTest.java | 81 ++++++++++++++++++++++
 2 files changed, 82 insertions(+)

diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index 5b70097..1284965 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -228,6 +228,7 @@ public class MQTTEndpoint extends DefaultEndpoint implements AsyncEndpoint {
         super.doStart();
 
         createConnection();
+        connect();
     }
 
     protected void createConnection() {
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTLoopProducerTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTLoopProducerTest.java
new file mode 100644
index 0000000..1266269
--- /dev/null
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTLoopProducerTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mqtt;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.Test;
+
+public class MQTTLoopProducerTest extends MQTTBaseTest {
+    @Test
+    public void testProduce() throws Exception {
+        MQTT mqtt = new MQTT();
+        mqtt.setHost(MQTTTestSupport.getHostForMQTTEndpoint());
+        final BlockingConnection subscribeConnection = mqtt.blockingConnection();
+        subscribeConnection.connect();
+        Topic topic = new Topic(TEST_TOPIC, QoS.AT_MOST_ONCE);
+        Topic[] topics = {topic};
+        subscribeConnection.subscribe(topics);
+        final CountDownLatch latch = new CountDownLatch(numberOfMessages * 2); //2 publishers
+
+        Thread thread = new Thread(new Runnable() {
+            public void run() {
+                for (int i = 0; i < numberOfMessages * 2; i++) {
+                    try {
+                        Message message = subscribeConnection.receive();
+                        message.ack();
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        break;
+                    }
+                }
+            }
+        });
+        thread.start();
+
+        Producer producer = context.getEndpoint("direct:foo").createProducer();
+        for (int i = 0; i < numberOfMessages; i++) {
+            Exchange exchange = producer.createExchange();
+            exchange.getIn().setBody("test message " + i);
+            producer.process(exchange);
+        }
+        latch.await(10, TimeUnit.SECONDS);
+        assertTrue("Messages not consumed = " + latch.getCount(), latch.getCount() == 0);
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:foo")
+                .setHeader(MQTTConfiguration.MQTT_PUBLISH_TOPIC, constant(TEST_TOPIC))
+                .to("mqtt:boo1?host=" + MQTTTestSupport.getHostForMQTTEndpoint() + "&qualityOfService=AtMostOnce")
+                .setHeader(MQTTConfiguration.MQTT_PUBLISH_TOPIC, constant(TEST_TOPIC))
+                .to("mqtt:boo2?host=" + MQTTTestSupport.getHostForMQTTEndpoint() + "&qualityOfService=AtMostOnce");
+            }
+        };
+    }
+}


[camel] 03/03: CAMEL-13014 - Fixed CS

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2e6551489a7a213bc6064ab43471749a2b25aff1
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Dec 19 08:15:44 2018 +0100

    CAMEL-13014 - Fixed CS
---
 .../org/apache/camel/component/mqtt/MQTTLoopProducerTest.java     | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTLoopProducerTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTLoopProducerTest.java
index 1266269..e28e0e4 100644
--- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTLoopProducerTest.java
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTLoopProducerTest.java
@@ -71,10 +71,10 @@ public class MQTTLoopProducerTest extends MQTTBaseTest {
         return new RouteBuilder() {
             public void configure() {
                 from("direct:foo")
-                .setHeader(MQTTConfiguration.MQTT_PUBLISH_TOPIC, constant(TEST_TOPIC))
-                .to("mqtt:boo1?host=" + MQTTTestSupport.getHostForMQTTEndpoint() + "&qualityOfService=AtMostOnce")
-                .setHeader(MQTTConfiguration.MQTT_PUBLISH_TOPIC, constant(TEST_TOPIC))
-                .to("mqtt:boo2?host=" + MQTTTestSupport.getHostForMQTTEndpoint() + "&qualityOfService=AtMostOnce");
+                    .setHeader(MQTTConfiguration.MQTT_PUBLISH_TOPIC, constant(TEST_TOPIC))
+                    .to("mqtt:boo1?host=" + MQTTTestSupport.getHostForMQTTEndpoint() + "&qualityOfService=AtMostOnce")
+                    .setHeader(MQTTConfiguration.MQTT_PUBLISH_TOPIC, constant(TEST_TOPIC))
+                    .to("mqtt:boo2?host=" + MQTTTestSupport.getHostForMQTTEndpoint() + "&qualityOfService=AtMostOnce");
             }
         };
     }


[camel] 02/03: CAMEL-13014, fix MQTTConfigurationTest junit test

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c599e5f339eecbd6780e36169ffc933a46ea02bb
Author: Fabrizio Spataro fabryprog <fa...@gmail.com>
AuthorDate: Tue Dec 18 22:32:06 2018 +0100

    CAMEL-13014, fix MQTTConfigurationTest junit test
---
 .../java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
index 24e0ad4..dcc2d21 100644
--- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java
@@ -63,7 +63,7 @@ public class MQTTConfigurationTest extends MQTTBaseTest {
 
     @Test
     public void testExactlyOnceQualityOfServiceConfiguration() throws Exception {
-        Endpoint endpoint = context.getEndpoint("mqtt:todo?qualityOfService=exactlyOnce");
+        Endpoint endpoint = context.getEndpoint("mqtt:todo?qualityOfService=exactlyOnce&host=" + MQTTTestSupport.getHostForMQTTEndpoint());
         assertTrue("Endpoint not a MQTTEndpoint: " + endpoint, endpoint instanceof MQTTEndpoint);
         MQTTEndpoint mqttEndpoint = (MQTTEndpoint)endpoint;
 
@@ -72,7 +72,7 @@ public class MQTTConfigurationTest extends MQTTBaseTest {
 
     @Test
     public void testAtLeastOnceQualityOfServiceConfiguration() throws Exception {
-        Endpoint endpoint = context.getEndpoint("mqtt:todo?qualityOfService=AtLeastOnce");
+        Endpoint endpoint = context.getEndpoint("mqtt:todo?qualityOfService=AtLeastOnce&host=" + MQTTTestSupport.getHostForMQTTEndpoint());
         assertTrue("Endpoint not a MQTTEndpoint: " + endpoint, endpoint instanceof MQTTEndpoint);
         MQTTEndpoint mqttEndpoint = (MQTTEndpoint)endpoint;
 
@@ -81,7 +81,7 @@ public class MQTTConfigurationTest extends MQTTBaseTest {
 
     @Test
     public void testAtMostOnceQualityOfServiceConfiguration() throws Exception {
-        Endpoint endpoint = context.getEndpoint("mqtt:todo?qualityOfService=AtMostOnce");
+        Endpoint endpoint = context.getEndpoint("mqtt:todo?qualityOfService=AtMostOnce&host=" + MQTTTestSupport.getHostForMQTTEndpoint());
         assertTrue("Endpoint not a MQTTEndpoint: " + endpoint, endpoint instanceof MQTTEndpoint);
         MQTTEndpoint mqttEndpoint = (MQTTEndpoint)endpoint;