You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/01/11 14:27:44 UTC

camel git commit: CAMEL-10695: camel-mqtt: TimeoutException thrown on MQTTEndpoint stop

Repository: camel
Updated Branches:
  refs/heads/master 879781312 -> 66a11e0c9


CAMEL-10695: camel-mqtt: TimeoutException thrown on MQTTEndpoint stop


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/66a11e0c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/66a11e0c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/66a11e0c

Branch: refs/heads/master
Commit: 66a11e0c96a4abb3d8902f1be96ac4bc33293d44
Parents: 8797813
Author: lburgazzoli <lb...@gmail.com>
Authored: Wed Jan 11 15:22:19 2017 +0100
Committer: lburgazzoli <lb...@gmail.com>
Committed: Wed Jan 11 15:23:30 2017 +0100

----------------------------------------------------------------------
 components/camel-mqtt/pom.xml                   |  5 +++
 .../camel/component/mqtt/MQTTEndpoint.java      |  8 ++--
 .../camel/component/mqtt/MQTTBaseTest.java      |  5 +--
 .../camel/component/mqtt/MQTTTestSupport.java   | 19 +++++++++
 .../camel/component/mqtt/SpringMQTTTest.java    | 35 ++++++++++++++++
 .../camel/component/mqtt/SpringMQTTTest.xml     | 44 ++++++++++++++++++++
 6 files changed, 109 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/66a11e0c/components/camel-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/pom.xml b/components/camel-mqtt/pom.xml
index 357f7f6..8a91e18 100644
--- a/components/camel-mqtt/pom.xml
+++ b/components/camel-mqtt/pom.xml
@@ -52,6 +52,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-spring</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.activemq</groupId>
             <artifactId>activemq-broker</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/camel/blob/66a11e0c/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index 71e333e..455b7a8 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -278,25 +278,27 @@ public class MQTTEndpoint extends DefaultEndpoint implements AsyncEndpoint {
         });
     }
 
+    @Override
     protected void doStop() throws Exception {
         super.doStop();
 
-        if (connection != null) {
-            final Promise<Void> promise = new Promise<Void>();
+        if (connection != null && connected) {
+            final Promise<Void> promise = new Promise<>();
             connection.getDispatchQueue().execute(new Task() {
                 @Override
                 public void run() {
                     connection.disconnect(new Callback<Void>() {
                         public void onSuccess(Void value) {
+                            connected = false;
                             promise.onSuccess(value);
                         }
-
                         public void onFailure(Throwable value) {
                             promise.onFailure(value);
                         }
                     });
                 }
             });
+
             promise.await(configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS);
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/66a11e0c/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
index 7d6280f..476abf4 100644
--- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java
@@ -36,10 +36,7 @@ public abstract class MQTTBaseTest extends CamelTestSupport {
 
 
     public void setUp() throws Exception {
-        brokerService = new BrokerService();
-        brokerService.setPersistent(false);
-        brokerService.setAdvisorySupport(false);
-        brokerService.addConnector(MQTTTestSupport.getConnection());
+        brokerService = MQTTTestSupport.newBrokerService();
         brokerService.start();
         super.setUp();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/66a11e0c/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTTestSupport.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTTestSupport.java
index d521b10..f870855 100644
--- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTTestSupport.java
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTTestSupport.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.component.mqtt;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.camel.test.AvailablePortFinder;
 
 /**
@@ -58,4 +59,22 @@ public final class MQTTTestSupport {
     public static String getHostForMQTTEndpoint() {
         return HOST;
     }
+
+
+
+    public static BrokerService newBrokerService() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setPersistent(false);
+        service.setAdvisorySupport(false);
+        service.addConnector(getConnection());
+
+        return service;
+    }
+
+    public static MQTTComponent newComponent() throws Exception {
+        MQTTComponent component = new MQTTComponent();
+        component.setHost(getHostForMQTTEndpoint());
+
+        return component;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/66a11e0c/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/SpringMQTTTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/SpringMQTTTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/SpringMQTTTest.java
new file mode 100644
index 0000000..d5cdb09
--- /dev/null
+++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/SpringMQTTTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mqtt;
+
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext
+public class SpringMQTTTest extends CamelSpringTestSupport {
+    @Override
+    protected AbstractApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("org/apache/camel/component/mqtt/SpringMQTTTest.xml");
+    }
+
+    @Test
+    public void simpleTest() throws Exception {
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/66a11e0c/components/camel-mqtt/src/test/resources/org/apache/camel/component/mqtt/SpringMQTTTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/test/resources/org/apache/camel/component/mqtt/SpringMQTTTest.xml b/components/camel-mqtt/src/test/resources/org/apache/camel/component/mqtt/SpringMQTTTest.xml
new file mode 100644
index 0000000..7574675
--- /dev/null
+++ b/components/camel-mqtt/src/test/resources/org/apache/camel/component/mqtt/SpringMQTTTest.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
+        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+  <bean id="broker"
+        class="org.apache.camel.component.mqtt.MQTTTestSupport"
+        factory-method="newBrokerService"
+        init-method="start"
+        destroy-method="stop"/>
+
+  <bean id="mqtt"
+        class="org.apache.camel.component.mqtt.MQTTTestSupport"
+        factory-method="newComponent"/>
+
+  <camelContext id="basic-mqtt-component" xmlns="http://camel.apache.org/schema/spring" autoStartup="true">
+
+    <endpoint id="mqtt-endpoint" uri="mqtt:mqtt-endpoint?publishTopicName=basicMqtt/message/basic-1"/>
+
+    <route>
+      <from uri="direct:in"/>
+      <to uri="ref:mqtt-endpoint"/>
+    </route>
+  </camelContext>
+
+</beans>