You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/04/06 07:30:34 UTC
[streampipes] branch 1485-mqtt-connection-fails updated: Add integration test for MQTT (#1485)
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch 1485-mqtt-connection-fails
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/1485-mqtt-connection-fails by this push:
new 95abf4628 Add integration test for MQTT (#1485)
95abf4628 is described below
commit 95abf4628c4f97b47ae317ce5a73af93d980e824
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Apr 6 09:30:13 2023 +0200
Add integration test for MQTT (#1485)
---
.../pe/shared/config/mqtt/MqttConnectUtils.java | 6 +-
.../sinks/brokers/jvm/mqtt/MqttClient.java | 4 +-
streampipes-integration-tests/pom.xml | 10 ++
.../integration/adapters/AdapterTesterBase.java | 3 +-
.../integration/adapters/AdaptersTest.java | 7 ++
.../integration/adapters/MqttAdapterTester.java | 127 +++++++++++++++++++++
.../integration/containers/MosquittoContainer.java | 52 +++++++++
.../MosquittoDevContainer.java} | 24 +---
.../src/test/resources/mosquitto.conf | 21 ++++
.../couchdb/impl/ConnectionStorageImpl.java | 3 +-
10 files changed, 233 insertions(+), 24 deletions(-)
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java
index 4b6d83367..e1b82116a 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java
@@ -52,11 +52,15 @@ public class MqttConnectUtils {
}
public static StaticPropertyAlternative getAlternativesOne() {
- //return Alternatives.from(Labels.from(ANONYMOUS_ACCESS, "Unauthenticated", ""));
return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
}
+ public static StaticPropertyAlternative getAlternativesOne(boolean selected) {
+ return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS), selected);
+
+ }
+
public static StaticPropertyAlternative getAlternativesTwo() {
return Alternatives.from(Labels.withId(USERNAME_ACCESS),
StaticProperties.group(Labels.withId(USERNAME_GROUP),
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttClient.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttClient.java
index 280d980dd..29c88dfc4 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttClient.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttClient.java
@@ -155,7 +155,9 @@ public class MqttClient {
*/
public void disconnect() {
try {
- this.conn.disconnect();
+ if (this.conn.isConnected()) {
+ this.conn.disconnect();
+ }
} catch (Exception e) {
throw new SpRuntimeException("Could not disconnect from MQTT broker: "
+ uri.toString() + ", " + e.getMessage(), e);
diff --git a/streampipes-integration-tests/pom.xml b/streampipes-integration-tests/pom.xml
index 93d8e2ea7..60309278e 100644
--- a/streampipes-integration-tests/pom.xml
+++ b/streampipes-integration-tests/pom.xml
@@ -49,6 +49,11 @@
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-messaging-mqtt</artifactId>
+ <version>0.92.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-connect-adapters-iiot</artifactId>
@@ -56,6 +61,11 @@
<classifier>embed</classifier>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-pipeline-elements-shared</artifactId>
+ <version>0.92.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-service-discovery</artifactId>
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
index e2134bf35..1140dc516 100644
--- a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
@@ -17,6 +17,7 @@
*/
package org.apache.streampipes.integration.adapters;
+import org.apache.streampipes.connect.iiot.protocol.stream.MqttProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.pulsar.PulsarProtocol;
import org.apache.streampipes.extensions.api.connect.exception.AdapterException;
import org.apache.streampipes.extensions.management.connect.AdapterUtils;
@@ -32,7 +33,7 @@ public abstract class AdapterTesterBase implements AutoCloseable {
Adapter adapter;
public Adapter startAdapter(AdapterDescription adapterDescription) throws AdapterException {
- DeclarersSingleton.getInstance().add(new PulsarProtocol());
+ DeclarersSingleton.getInstance().add(new PulsarProtocol()).add(new MqttProtocol());
Adapter adapter = (Adapter) AdapterUtils.setAdapter(adapterDescription);
adapter.startAdapter();
this.adapter = adapter;
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
index 9d1801294..8b64157e4 100644
--- a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
@@ -39,4 +39,11 @@ public class AdaptersTest {
testAdapter(pulsarAdapterTester);
}
}
+
+ @Test
+ public void testMqttAdapter() throws Exception {
+ try (MqttAdapterTester mqttAdapterTester = new MqttAdapterTester()) {
+ testAdapter(mqttAdapterTester);
+ }
+ }
}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
new file mode 100644
index 000000000..ee8c761f4
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.adapters;
+
+import org.apache.streampipes.connect.iiot.protocol.stream.MqttProtocol;
+import org.apache.streampipes.extensions.api.connect.exception.AdapterException;
+import org.apache.streampipes.extensions.management.connect.adapter.format.json.object.JsonObjectFormat;
+import org.apache.streampipes.integration.containers.MosquittoContainer;
+import org.apache.streampipes.integration.containers.MosquittoDevContainer;
+import org.apache.streampipes.messaging.mqtt.MqttPublisher;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.connect.rules.DebugSinkRuleDescription;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+import org.apache.streampipes.pe.shared.config.mqtt.MqttConnectUtils;
+import org.apache.streampipes.sdk.builder.adapter.GenericDataStreamAdapterBuilder;
+import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+import com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class MqttAdapterTester extends AdapterTesterBase {
+
+ MosquittoContainer mosquittoContainer;
+
+ private static final String TOPIC = "testtopic";
+
+ @Override
+ public void startAdapterService() throws Exception {
+ if (Objects.equals(System.getenv("TEST_MODE"), "dev")) {
+ mosquittoContainer = new MosquittoDevContainer();
+ } else {
+ mosquittoContainer = new MosquittoContainer();
+ }
+ mosquittoContainer.start();
+ }
+
+ @Override
+ public AdapterDescription prepareAdapter() throws Exception {
+ return GenericDataStreamAdapterBuilder
+ .create(MqttProtocol.ID)
+ .format(new JsonObjectFormat()
+ .declareModel())
+ .protocol(ProtocolDescriptionBuilder.create(MqttProtocol.ID)
+ .requiredTextParameter(
+ Labels.withId(MqttConnectUtils.BROKER_URL),
+ mosquittoContainer.getBrokerUrl())
+ .requiredTextParameter(
+ Labels.withId(MqttConnectUtils.TOPIC), TOPIC)
+ .requiredAlternatives(
+ MqttConnectUtils.getAccessModeLabel(),
+ MqttConnectUtils.getAlternativesOne(true),
+ MqttConnectUtils.getAlternativesTwo())
+ .build())
+ .addRules(List.of(new DebugSinkRuleDescription()))
+ .build();
+ }
+
+ @Override
+ public List<Map<String, Object>> generateData() throws Exception {
+ List<Map<String, Object>> result = new ArrayList<>();
+ MqttTransportProtocol mqttSettings = makeMqttSettings();
+ MqttPublisher publisher = new MqttPublisher();
+ publisher.connect(mqttSettings);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ for (int i = 0; i < 3; i++) {
+ var dataMap = new HashMap<String, Object>();
+ dataMap.put("timestamp", i);
+ dataMap.put("value", "test-data");
+ byte[] data = objectMapper.writeValueAsBytes(dataMap);
+ publisher.publish(data);
+ result.add(dataMap);
+ }
+
+ publisher.disconnect();
+ return result;
+ }
+
+ private MqttTransportProtocol makeMqttSettings() {
+ return new MqttTransportProtocol(mosquittoContainer.getBrokerHost(), mosquittoContainer.getBrokerPort(), TOPIC);
+ }
+
+ @Override
+ public void validateData(List<Map<String, Object>> expectedData) throws Exception {
+ for (Map<String, Object> expected : expectedData) {
+ Map<String, Object> actual = takeEvent();
+ Assert.assertTrue(Maps.difference(expected, actual).areEqual());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (mosquittoContainer != null) {
+ mosquittoContainer.stop();
+ }
+ try {
+ stopAdapter();
+ } catch (AdapterException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java
new file mode 100644
index 000000000..8f37bfa9f
--- /dev/null
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.containers;
+
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+
+public class MosquittoContainer extends GenericContainer<MosquittoContainer> {
+
+ protected static final int MOSQUITTO_PORT = 1883;
+
+ public MosquittoContainer() {
+ super("eclipse-mosquitto:latest");
+ }
+
+ public void start() {
+ this.withExposedPorts(MOSQUITTO_PORT, MOSQUITTO_PORT);
+ this.withClasspathResourceMapping(
+ "mosquitto.conf",
+ "/mosquitto/config/mosquitto.conf",
+ BindMode.READ_ONLY);
+ super.start();
+ }
+
+ public String getBrokerHost() {
+ return getHost();
+ }
+
+ public Integer getBrokerPort() {
+ return getMappedPort(MOSQUITTO_PORT);
+ }
+
+ public String getBrokerUrl() {
+ return "tcp://" + getBrokerHost() + ":" + getBrokerPort();
+ }
+}
diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoDevContainer.java
similarity index 51%
copy from streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
copy to streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoDevContainer.java
index 9d1801294..40e84fed8 100644
--- a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
+++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoDevContainer.java
@@ -15,28 +15,12 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.integration.adapters;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+package org.apache.streampipes.integration.containers;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Map;
-
-public class AdaptersTest {
- public void testAdapter(AdapterTesterBase adapterTester) throws Exception {
- adapterTester.startAdapterService();
- AdapterDescription adapterDescription = adapterTester.prepareAdapter();
- adapterTester.startAdapter(adapterDescription);
- List<Map<String, Object>> data = adapterTester.generateData();
- adapterTester.validateData(data);
+public class MosquittoDevContainer extends MosquittoContainer {
+ public String getBrokerHost() {
+ return "localhost";
}
- @Test
- public void testPulsarAdapter() throws Exception {
- try (PulsarAdapterTester pulsarAdapterTester = new PulsarAdapterTester()) {
- testAdapter(pulsarAdapterTester);
- }
- }
}
diff --git a/streampipes-integration-tests/src/test/resources/mosquitto.conf b/streampipes-integration-tests/src/test/resources/mosquitto.conf
new file mode 100644
index 000000000..3c4318c1a
--- /dev/null
+++ b/streampipes-integration-tests/src/test/resources/mosquitto.conf
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+allow_anonymous true
+listener 1883
+log_type all
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ConnectionStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ConnectionStorageImpl.java
index de2110da9..6402d4b30 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ConnectionStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ConnectionStorageImpl.java
@@ -98,7 +98,8 @@ public class ConnectionStorageImpl extends AbstractDao<Connection> implements
private Optional<JsonObject> getFrequentConnections(String query) {
try {
- return Optional.of((JsonObject) new JsonParser().parse(Request.Get(query).execute().returnContent().asString()));
+ var request = Utils.append(Request.Get(query));
+ return Optional.of((JsonObject) new JsonParser().parse(request.execute().returnContent().asString()));
} catch (IOException e) {
e.printStackTrace();
return Optional.empty();