You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/04/06 07:30:34 UTC

[streampipes] branch 1485-mqtt-connection-fails updated: Add integration test for MQTT (#1485)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch 1485-mqtt-connection-fails
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/1485-mqtt-connection-fails by this push:
     new 95abf4628 Add integration test for MQTT (#1485)
95abf4628 is described below

commit 95abf4628c4f97b47ae317ce5a73af93d980e824
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Apr 6 09:30:13 2023 +0200

    Add integration test for MQTT (#1485)
---
 .../pe/shared/config/mqtt/MqttConnectUtils.java    |   6 +-
 .../sinks/brokers/jvm/mqtt/MqttClient.java         |   4 +-
 streampipes-integration-tests/pom.xml              |  10 ++
 .../integration/adapters/AdapterTesterBase.java    |   3 +-
 .../integration/adapters/AdaptersTest.java         |   7 ++
 .../integration/adapters/MqttAdapterTester.java    | 127 +++++++++++++++++++++
 .../integration/containers/MosquittoContainer.java |  52 +++++++++
 .../MosquittoDevContainer.java}                    |  24 +---
 .../src/test/resources/mosquitto.conf              |  21 ++++
 .../couchdb/impl/ConnectionStorageImpl.java        |   3 +-
 10 files changed, 233 insertions(+), 24 deletions(-)

diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java
index 4b6d83367..e1b82116a 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java
@@ -52,11 +52,15 @@ public class MqttConnectUtils {
   }
 
   public static StaticPropertyAlternative getAlternativesOne() {
-    //return Alternatives.from(Labels.from(ANONYMOUS_ACCESS, "Unauthenticated", ""));
     return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
 
   }
 
+  public static StaticPropertyAlternative getAlternativesOne(boolean selected) {
+    return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS), selected);
+
+  }
+
   public static StaticPropertyAlternative getAlternativesTwo() {
     return Alternatives.from(Labels.withId(USERNAME_ACCESS),
         StaticProperties.group(Labels.withId(USERNAME_GROUP),
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttClient.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttClient.java
index 280d980dd..29c88dfc4 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttClient.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttClient.java
@@ -155,7 +155,9 @@ public class MqttClient {
    */
   public void disconnect() {
     try {
-      this.conn.disconnect();
+      if (this.conn.isConnected()) {
+        this.conn.disconnect();
+      }
     } catch (Exception e) {
       throw new SpRuntimeException("Could not disconnect from MQTT broker: "
           + uri.toString() + ", " + e.getMessage(), e);
diff --git a/streampipes-integration-tests/pom.xml b/streampipes-integration-tests/pom.xml
index 93d8e2ea7..60309278e 100644
--- a/streampipes-integration-tests/pom.xml
+++ b/streampipes-integration-tests/pom.xml
@@ -49,6 +49,11 @@
             <artifactId>pulsar-client</artifactId>
             <version>${pulsar.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-messaging-mqtt</artifactId>
+            <version>0.92.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-connect-adapters-iiot</artifactId>
@@ -56,6 +61,11 @@
             <classifier>embed</classifier>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-pipeline-elements-shared</artifactId>
+            <version>0.92.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-service-discovery</artifactId>
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
index e2134bf35..1140dc516 100644
--- a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.integration.adapters;
 
+import org.apache.streampipes.connect.iiot.protocol.stream.MqttProtocol;
 import org.apache.streampipes.connect.iiot.protocol.stream.pulsar.PulsarProtocol;
 import org.apache.streampipes.extensions.api.connect.exception.AdapterException;
 import org.apache.streampipes.extensions.management.connect.AdapterUtils;
@@ -32,7 +33,7 @@ public abstract class AdapterTesterBase implements AutoCloseable {
   Adapter adapter;
 
   public Adapter startAdapter(AdapterDescription adapterDescription) throws AdapterException {
-    DeclarersSingleton.getInstance().add(new PulsarProtocol());
+    DeclarersSingleton.getInstance().add(new PulsarProtocol()).add(new MqttProtocol());
     Adapter adapter = (Adapter) AdapterUtils.setAdapter(adapterDescription);
     adapter.startAdapter();
     this.adapter = adapter;
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
index 9d1801294..8b64157e4 100644
--- a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
@@ -39,4 +39,11 @@ public class AdaptersTest {
       testAdapter(pulsarAdapterTester);
     }
   }
+
+  @Test
+  public void testMqttAdapter() throws Exception {
+    try (MqttAdapterTester mqttAdapterTester = new MqttAdapterTester()) {
+      testAdapter(mqttAdapterTester);
+    }
+  }
 }
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
new file mode 100644
index 000000000..ee8c761f4
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.adapters;
+
+import org.apache.streampipes.connect.iiot.protocol.stream.MqttProtocol;
+import org.apache.streampipes.extensions.api.connect.exception.AdapterException;
+import org.apache.streampipes.extensions.management.connect.adapter.format.json.object.JsonObjectFormat;
+import org.apache.streampipes.integration.containers.MosquittoContainer;
+import org.apache.streampipes.integration.containers.MosquittoDevContainer;
+import org.apache.streampipes.messaging.mqtt.MqttPublisher;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.rules.DebugSinkRuleDescription;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.pe.shared.config.mqtt.MqttConnectUtils;
+import org.apache.streampipes.sdk.builder.adapter.GenericDataStreamAdapterBuilder;
+import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+import com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class MqttAdapterTester extends AdapterTesterBase {
+
+  MosquittoContainer mosquittoContainer;
+
+  private static final String TOPIC = "testtopic";
+
+  @Override
+  public void startAdapterService() throws Exception {
+    if (Objects.equals(System.getenv("TEST_MODE"), "dev")) {
+      mosquittoContainer = new MosquittoDevContainer();
+    } else {
+      mosquittoContainer = new MosquittoContainer();
+    }
+    mosquittoContainer.start();
+  }
+
+  @Override
+  public AdapterDescription prepareAdapter() throws Exception {
+    return GenericDataStreamAdapterBuilder
+        .create(MqttProtocol.ID)
+        .format(new JsonObjectFormat()
+            .declareModel())
+        .protocol(ProtocolDescriptionBuilder.create(MqttProtocol.ID)
+            .requiredTextParameter(
+                Labels.withId(MqttConnectUtils.BROKER_URL),
+                mosquittoContainer.getBrokerUrl())
+            .requiredTextParameter(
+                Labels.withId(MqttConnectUtils.TOPIC), TOPIC)
+            .requiredAlternatives(
+                MqttConnectUtils.getAccessModeLabel(),
+                MqttConnectUtils.getAlternativesOne(true),
+                MqttConnectUtils.getAlternativesTwo())
+            .build())
+        .addRules(List.of(new DebugSinkRuleDescription()))
+        .build();
+  }
+
+  @Override
+  public List<Map<String, Object>> generateData() throws Exception {
+    List<Map<String, Object>> result = new ArrayList<>();
+    MqttTransportProtocol mqttSettings = makeMqttSettings();
+    MqttPublisher publisher = new MqttPublisher();
+    publisher.connect(mqttSettings);
+
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    for (int i = 0; i < 3; i++) {
+      var dataMap = new HashMap<String, Object>();
+      dataMap.put("timestamp", i);
+      dataMap.put("value", "test-data");
+      byte[] data = objectMapper.writeValueAsBytes(dataMap);
+      publisher.publish(data);
+      result.add(dataMap);
+    }
+
+    publisher.disconnect();
+    return result;
+  }
+
+  private MqttTransportProtocol makeMqttSettings() {
+    return new MqttTransportProtocol(mosquittoContainer.getBrokerHost(), mosquittoContainer.getBrokerPort(), TOPIC);
+  }
+
+  @Override
+  public void validateData(List<Map<String, Object>> expectedData) throws Exception {
+    for (Map<String, Object> expected : expectedData) {
+      Map<String, Object> actual = takeEvent();
+      Assert.assertTrue(Maps.difference(expected, actual).areEqual());
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (mosquittoContainer != null) {
+      mosquittoContainer.stop();
+    }
+    try {
+      stopAdapter();
+    } catch (AdapterException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java
new file mode 100644
index 000000000..8f37bfa9f
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.containers;
+
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+
+public class MosquittoContainer extends GenericContainer<MosquittoContainer> {
+
+  protected static final int MOSQUITTO_PORT = 1883;
+
+  public MosquittoContainer() {
+    super("eclipse-mosquitto:latest");
+  }
+
+  public void start() {
+    this.withExposedPorts(MOSQUITTO_PORT, MOSQUITTO_PORT);
+    this.withClasspathResourceMapping(
+        "mosquitto.conf",
+        "/mosquitto/config/mosquitto.conf",
+        BindMode.READ_ONLY);
+    super.start();
+  }
+
+  public String getBrokerHost() {
+    return getHost();
+  }
+
+  public Integer getBrokerPort() {
+    return getMappedPort(MOSQUITTO_PORT);
+  }
+
+  public String getBrokerUrl() {
+    return "tcp://" + getBrokerHost() + ":" + getBrokerPort();
+  }
+}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoDevContainer.java
similarity index 51%
copy from streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
copy to streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoDevContainer.java
index 9d1801294..40e84fed8 100644
--- a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoDevContainer.java
@@ -15,28 +15,12 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.integration.adapters;
 
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+package org.apache.streampipes.integration.containers;
 
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Map;
-
-public class AdaptersTest {
-  public void testAdapter(AdapterTesterBase adapterTester) throws Exception {
-    adapterTester.startAdapterService();
-    AdapterDescription adapterDescription = adapterTester.prepareAdapter();
-    adapterTester.startAdapter(adapterDescription);
-    List<Map<String, Object>> data = adapterTester.generateData();
-    adapterTester.validateData(data);
+public class MosquittoDevContainer extends MosquittoContainer {
+  public String getBrokerHost() {
+    return "localhost";
   }
 
-  @Test
-  public void testPulsarAdapter() throws Exception {
-    try (PulsarAdapterTester pulsarAdapterTester = new PulsarAdapterTester()) {
-      testAdapter(pulsarAdapterTester);
-    }
-  }
 }
diff --git a/streampipes-integration-tests/src/test/resources/mosquitto.conf b/streampipes-integration-tests/src/test/resources/mosquitto.conf
new file mode 100644
index 000000000..3c4318c1a
--- /dev/null
+++ b/streampipes-integration-tests/src/test/resources/mosquitto.conf
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+allow_anonymous true
+listener 1883
+log_type all
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ConnectionStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ConnectionStorageImpl.java
index de2110da9..6402d4b30 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ConnectionStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ConnectionStorageImpl.java
@@ -98,7 +98,8 @@ public class ConnectionStorageImpl extends AbstractDao<Connection> implements
 
   private Optional<JsonObject> getFrequentConnections(String query) {
     try {
-      return Optional.of((JsonObject) new JsonParser().parse(Request.Get(query).execute().returnContent().asString()));
+      var request = Utils.append(Request.Get(query));
+      return Optional.of((JsonObject) new JsonParser().parse(request.execute().returnContent().asString()));
     } catch (IOException e) {
       e.printStackTrace();
       return Optional.empty();