You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/02/04 15:41:54 UTC

[incubator-streampipes-extensions] branch netio updated (6254a33 -> f74ee7b)

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a change to branch netio
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git.


    from 6254a33  Created NETIO REST adapter
     add 3ed27a4  Add missing icons to adapter
     new b9c296d  Merge branch 'dev' into netio
     new f74ee7b  Finished NETIO MQTT adapter

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streampipes/connect/ConnectAdapterInit.java    |   6 +-
 .../adapters/influxdb/InfluxDbSetAdapter.java      |   1 +
 .../adapters/influxdb/InfluxDbStreamAdapter.java   |   1 +
 .../connect/adapters/mysql/MySqlAdapter.java       |   2 +-
 .../connect/adapters/mysql/MySqlSetAdapter.java    |   1 +
 .../connect/adapters/mysql/MySqlStreamAdapter.java |   1 +
 .../connect/adapters/netio/NetioMQTTAdapter.java   |  88 +++++++----------
 .../connect/adapters/ros/RosBridgeAdapter.java     |   7 +-
 .../connect/adapters/ti/TISensorTag.java           |  36 +------
 .../connect/protocol/stream/MqttProtocol.java      |  30 +-----
 .../connect/utils/MqttConnectUtils.java            | 109 +++++++++++++++++++++
 .../strings.en                                     |   9 --
 12 files changed, 163 insertions(+), 128 deletions(-)
 create mode 100644 streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/utils/MqttConnectUtils.java


[incubator-streampipes-extensions] 02/02: Finished NETIO MQTT adapter

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch netio
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit f74ee7b5b3083874ca8271d2277f603e4baecf38
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Tue Feb 4 16:41:15 2020 +0100

    Finished NETIO MQTT adapter
---
 .../streampipes/connect/ConnectAdapterInit.java    |   6 +-
 .../connect/adapters/netio/NetioMQTTAdapter.java   |  88 +++++++----------
 .../connect/adapters/ros/RosBridgeAdapter.java     |   7 +-
 .../connect/adapters/ti/TISensorTag.java           |  36 +------
 .../connect/protocol/stream/MqttProtocol.java      |  30 +-----
 .../connect/utils/MqttConnectUtils.java            | 109 +++++++++++++++++++++
 .../strings.en                                     |   9 --
 7 files changed, 158 insertions(+), 127 deletions(-)

diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
index 1539227..b3e2e91 100644
--- a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect;
 
+import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
 import org.apache.streampipes.connect.adapters.ti.TISensorTag;
 import org.apache.streampipes.connect.protocol.set.HttpProtocol;
@@ -63,8 +64,8 @@ public class ConnectAdapterInit extends AdapterWorkerContainer {
             .add(new MqttProtocol())
             .add(new HttpStreamProtocol())
             .add(new PulsarProtocol())
-
-            // Specific Adapters
+//
+//            // Specific Adapters
             .add(new GdeltAdapter())
             .add(new CoindeskBitcoinAdapter())
             .add(new IexCloudNewsAdapter())
@@ -82,6 +83,7 @@ public class ConnectAdapterInit extends AdapterWorkerContainer {
             .add(new InfluxDbSetAdapter())
             .add(new TISensorTag())
             .add(new NetioRestAdapter())
+            .add(new NetioMQTTAdapter())
             .add(new Plc4xS7Adapter());
 
     String workerUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerWorkerUrl();
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/netio/NetioMQTTAdapter.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/netio/NetioMQTTAdapter.java
index dd43c53..f658a0f 100644
--- a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/netio/NetioMQTTAdapter.java
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/netio/NetioMQTTAdapter.java
@@ -18,32 +18,29 @@
 
 package org.apache.streampipes.connect.adapters.netio;
 
+import com.google.gson.Gson;
+import org.apache.streampipes.connect.utils.MqttConnectUtils;
 import org.apache.streampipes.connect.adapter.Adapter;
 import org.apache.streampipes.connect.adapter.exception.AdapterException;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
 import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
+import org.apache.streampipes.connect.adapters.netio.model.NetioAllPowerOutputs;
+import org.apache.streampipes.connect.adapters.netio.model.NetioPowerOutput;
 import org.apache.streampipes.connect.protocol.stream.MqttConfig;
 import org.apache.streampipes.connect.protocol.stream.MqttConsumer;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
-import org.apache.streampipes.sdk.helpers.Alternatives;
-import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 public class NetioMQTTAdapter extends SpecificDataStreamAdapter {
 
-
-
     private MqttConsumer mqttConsumer;
     private MqttConfig mqttConfig;
     private Thread thread;
@@ -54,20 +51,6 @@ public class NetioMQTTAdapter extends SpecificDataStreamAdapter {
     public static final String ID = "org.apache.streampipes.connect.adapters.netio.mqtt";
 
     /**
-     * Keys of user configuration parameters
-     */
-    private static final String ACCESS_MODE = "access-mode";
-    private static final String ANONYMOUS_ACCESS = "anonymous-alternative";
-    private static final String USERNAME_ACCESS = "username-alternative";
-    private static final String USERNAME = "username";
-    private static final String PASSWORD = "password";
-
-    /**
-     * Values of user configuration parameters
-     */
-    private String ip;
-
-    /**
      * Empty constructor and a constructor with SpecificAdapterStreamDescription are mandatory
      */
     public NetioMQTTAdapter() {
@@ -89,14 +72,9 @@ public class NetioMQTTAdapter extends SpecificDataStreamAdapter {
                 .withLocales(Locales.EN)
                 .withAssets(Assets.DOCUMENTATION, Assets.ICON)
                 .category(AdapterType.Energy)
-                .requiredAlternatives(Labels.from(ACCESS_MODE, "Access Mode", ""),
-                        Alternatives.from(Labels.from(ANONYMOUS_ACCESS, "Unauthenticated", "")),
-                        Alternatives.from(Labels.from(USERNAME_ACCESS, "Username/Password", ""),
-                                StaticProperties.group(Labels.withId("username-group"),
-                                        StaticProperties.stringFreeTextProperty(Labels.from(USERNAME,
-                                                "Username", "")),
-                                        StaticProperties.secretValue(Labels.from(PASSWORD,
-                                                "Password", "")))))
+                .requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
+                .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), MqttConnectUtils.getAlternativesOne(), MqttConnectUtils.getAlternativesTwo())
+//                .requiredTextParameter(MqttConnectUtils.getTopicLabel())
                 .build();
         description.setAppId(ID);
 
@@ -116,6 +94,10 @@ public class NetioMQTTAdapter extends SpecificDataStreamAdapter {
     }
     @Override
     public void startAdapter() throws AdapterException {
+        StaticPropertyExtractor extractor =
+                StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
+
+        this.mqttConfig = MqttConnectUtils.getMqttConfig(extractor, "devices/netio/messages/events/");
         this.mqttConsumer = new MqttConsumer(this.mqttConfig, new EventProcessor(adapterPipeline));
 
         thread = new Thread(this.mqttConsumer);
@@ -126,6 +108,7 @@ public class NetioMQTTAdapter extends SpecificDataStreamAdapter {
     public void stopAdapter() throws AdapterException {
         this.mqttConsumer.close();
     }
+
     /**
      * Required by StreamPipes return a new adapter instance by calling the constructor with SpecificAdapterStreamDescription
      * @param adapterDescription
@@ -146,27 +129,6 @@ public class NetioMQTTAdapter extends SpecificDataStreamAdapter {
         return ID;
     }
 
-    /**
-     * Extracts the user configuration from the SpecificAdapterStreamDescription and sets the local variales
-     * @param adapterDescription
-     */
-    private void getConfigurations(SpecificAdapterStreamDescription adapterDescription) {
-        StaticPropertyExtractor extractor =
-                StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
-
-        String brokerUrl = extractor.singleValueParameter("broker_url", String.class);
-        String topic = extractor.singleValueParameter("topic", String.class);
-        String selectedAlternative = extractor.selectedAlternativeInternalId("access_mode");
-
-        if (selectedAlternative.equals(ANONYMOUS_ACCESS)) {
-            mqttConfig = new MqttConfig(brokerUrl, topic);
-        } else {
-            String username = extractor.singleValueParameter(USERNAME, String.class);
-            String password = extractor.secretValue(PASSWORD);
-            mqttConfig = new MqttConfig(brokerUrl, topic, username, password);
-        }
-    }
-
     private class EventProcessor implements InternalEventProcessor<byte[]> {
         private AdapterPipeline adapterPipeline;
 
@@ -176,13 +138,29 @@ public class NetioMQTTAdapter extends SpecificDataStreamAdapter {
 
         @Override
         public void onEvent(byte[] payload) {
-            Map<String, Object> result = parseEvent(new String(payload));
-            adapterPipeline.process(result);
+            List<Map<String, Object>> events = parseEvent(payload);
+
+            for (Map<String, Object> event : events) {
+                adapterPipeline.process(event);
+            }
         }
     }
 
-    public static Map<String, Object> parseEvent(String s) {
-       return new HashMap<>();
+    public static List<Map<String, Object>> parseEvent(byte[] input) {
+        List<Map<String, Object>> result = new ArrayList<>();
+
+        String s = new String(input);
+
+        NetioAllPowerOutputs allPowerOutputs = new Gson().fromJson(s, NetioAllPowerOutputs.class);
+
+        for (NetioPowerOutput output : allPowerOutputs.getPowerOutputs()) {
+            if (allPowerOutputs.getGobalMeasure() != null && output != null) {
+                Map<String, Object> event = NetioUtils.getEvent(allPowerOutputs.getGobalMeasure(), output);
+                result.add(event);
+            }
+        }
+
+        return result;
     }
 
 }
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/ros/RosBridgeAdapter.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/ros/RosBridgeAdapter.java
index 0044af3..b79431a 100644
--- a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/ros/RosBridgeAdapter.java
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/ros/RosBridgeAdapter.java
@@ -55,7 +55,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.*;
 import java.util.stream.Collectors;
 
-public class RosBridgeAdapter extends SpecificDataStreamAdapter  implements ResolvesContainerProvidedOptions {
+public class RosBridgeAdapter extends SpecificDataStreamAdapter implements ResolvesContainerProvidedOptions {
 
     public static final String ID = "org.apache.streampipes.connect.adapters.ros";
 
@@ -90,10 +90,7 @@ public class RosBridgeAdapter extends SpecificDataStreamAdapter  implements Reso
                 .category(AdapterType.Manufacturing)
                 .requiredTextParameter(Labels.withId(ROS_HOST_KEY))
                 .requiredTextParameter(Labels.withId(ROS_PORT_KEY))
-                .requiredSingleValueSelectionFromContainer(Labels.withId(TOPIC_KEY), Arrays.asList(ROS_HOST_KEY,
-                        ROS_PORT_KEY))
-//                .requiredTextParameter(Labels.from(TOPIC_KEY, "Topic", "Example: /battery " +
-//                        "(Starts with /) "))
+                .requiredSingleValueSelectionFromContainer(Labels.withId(TOPIC_KEY), Arrays.asList(ROS_HOST_KEY, ROS_PORT_KEY))
                 .build();
 
         return  description;
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/ti/TISensorTag.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/ti/TISensorTag.java
index 485e05c..cffbf50 100644
--- a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/ti/TISensorTag.java
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/ti/TISensorTag.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampipes.connect.adapters.ti;
 
+import org.apache.streampipes.connect.utils.MqttConnectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.connect.adapter.Adapter;
@@ -29,11 +30,9 @@ import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
 import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
-import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.vocabulary.SO;
 import org.apache.streampipes.vocabulary.SPSensor;
@@ -50,13 +49,6 @@ public class TISensorTag extends SpecificDataStreamAdapter {
 
     public static final String ID = "org.apache.streampipes.connect.adapters.ti";
 
-    private static final String ACCESS_MODE = "access_mode";
-    private static final String ANONYMOUS_ACCESS = "anonymous-alternative";
-    private static final String USERNAME_ACCESS = "username-alternative";
-    private static final String USERNAME = "username";
-    private static final String PASSWORD = "password";
-
-
     private static final String TIMESTAMP = "timestamp";
     private static final String AMBIENT_TEMP = "ambientTemp";
     private static final String OBJECT_TEMP = "objectTemp";
@@ -93,17 +85,9 @@ public class TISensorTag extends SpecificDataStreamAdapter {
         SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID, "TI Sensor Tag", "")
                 .iconUrl("ti_sensor_tag.png")
                 .category(AdapterType.Environment, AdapterType.OpenData)
-                .requiredTextParameter(Labels.from("broker_url", "Broker URL",
-                        "Example: tcp://test-server.com:1883 (Protocol required. Port required)"))
-                .requiredAlternatives(Labels.from(ACCESS_MODE, "Access Mode", ""),
-                        Alternatives.from(Labels.from(ANONYMOUS_ACCESS, "Unauthenticated", "")),
-                        Alternatives.from(Labels.from(USERNAME_ACCESS, "Username/Password", ""),
-                                StaticProperties.group(Labels.withId("username-group"),
-                                        StaticProperties.stringFreeTextProperty(Labels.from(USERNAME,
-                                                "Username", "")),
-                                        StaticProperties.secretValue(Labels.from(PASSWORD,
-                                                "Password", "")))))
-                .requiredTextParameter(Labels.from("topic", "Topic","Example: test/topic"))
+                .requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
+                .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), MqttConnectUtils.getAlternativesOne(), MqttConnectUtils.getAlternativesTwo())
+                .requiredTextParameter(MqttConnectUtils.getTopicLabel())
                 .build();
 
         description.setAppId(ID);
@@ -129,17 +113,7 @@ public class TISensorTag extends SpecificDataStreamAdapter {
         StaticPropertyExtractor extractor =
                 StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
 
-        String brokerUrl = extractor.singleValueParameter("broker_url", String.class);
-        String topic = extractor.singleValueParameter("topic", String.class);
-        String selectedAlternative = extractor.selectedAlternativeInternalId("access_mode");
-
-        if (selectedAlternative.equals(ANONYMOUS_ACCESS)) {
-            mqttConfig = new MqttConfig(brokerUrl, topic);
-        } else {
-            String username = extractor.singleValueParameter(USERNAME, String.class);
-            String password = extractor.secretValue(PASSWORD);
-            mqttConfig = new MqttConfig(brokerUrl, topic, username, password);
-        }
+        mqttConfig = MqttConnectUtils.getMqttConfig(extractor);
 
         return new TISensorTag(adapterDescription, mqttConfig);
     }
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/protocol/stream/MqttProtocol.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/protocol/stream/MqttProtocol.java
index 3ede601..db7b5c7 100644
--- a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/protocol/stream/MqttProtocol.java
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/protocol/stream/MqttProtocol.java
@@ -18,6 +18,7 @@
 package org.apache.streampipes.connect.protocol.stream;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.streampipes.connect.utils.MqttConnectUtils;
 import org.apache.streampipes.connect.SendToPipeline;
 import org.apache.streampipes.connect.adapter.exception.ParseException;
 import org.apache.streampipes.connect.adapter.model.generic.Format;
@@ -27,12 +28,9 @@ import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
-import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.AdapterSourceType;
-import org.apache.streampipes.sdk.helpers.Alternatives;
-import org.apache.streampipes.sdk.helpers.Labels;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -66,17 +64,7 @@ public class MqttProtocol extends BrokerProtocol {
     StaticPropertyExtractor extractor =
             StaticPropertyExtractor.from(protocolDescription.getConfig(), new ArrayList<>());
 
-    String brokerUrl = extractor.singleValueParameter("broker_url", String.class);
-    String topic = extractor.singleValueParameter("topic", String.class);
-    String selectedAlternative = extractor.selectedAlternativeInternalId("access_mode");
-
-    if (selectedAlternative.equals(ANONYMOUS_ACCESS)) {
-      mqttConfig = new MqttConfig(brokerUrl, topic);
-    } else {
-      String username = extractor.singleValueParameter(USERNAME, String.class);
-      String password = extractor.secretValue(PASSWORD);
-      mqttConfig = new MqttConfig(brokerUrl, topic, username, password);
-    }
+    mqttConfig = MqttConnectUtils.getMqttConfig(extractor);
 
     return new MqttProtocol(parser, format, mqttConfig);
   }
@@ -88,17 +76,9 @@ public class MqttProtocol extends BrokerProtocol {
             .iconUrl("mqtt.png")
             .category(AdapterType.Generic, AdapterType.Manufacturing)
             .sourceType(AdapterSourceType.STREAM)
-            .requiredTextParameter(Labels.from("broker_url", "Broker URL",
-                    "Example: tcp://test-server.com:1883 (Protocol required. Port required)"))
-            .requiredAlternatives(Labels.from(ACCESS_MODE, "Access Mode", ""),
-                    Alternatives.from(Labels.from(ANONYMOUS_ACCESS, "Unauthenticated", "")),
-                    Alternatives.from(Labels.from(USERNAME_ACCESS, "Username/Password", ""),
-                            StaticProperties.group(Labels.withId("username-group"),
-                                    StaticProperties.stringFreeTextProperty(Labels.from(USERNAME,
-                                            "Username", "")),
-                                    StaticProperties.secretValue(Labels.from(PASSWORD,
-                                            "Password", "")))))
-            .requiredTextParameter(Labels.from("topic", "Topic","Example: test/topic"))
+            .requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
+            .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), MqttConnectUtils.getAlternativesOne(), MqttConnectUtils.getAlternativesTwo())
+            .requiredTextParameter(MqttConnectUtils.getTopicLabel())
             .build();
   }
 
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/utils/MqttConnectUtils.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/utils/MqttConnectUtils.java
new file mode 100644
index 0000000..2bfca09
--- /dev/null
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/utils/MqttConnectUtils.java
@@ -0,0 +1,109 @@
+/*
+Copyright 2020 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package org.apache.streampipes.connect.utils;
+
+import org.apache.streampipes.connect.protocol.stream.MqttConfig;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.helpers.Alternatives;
+import org.apache.streampipes.sdk.helpers.Label;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+public class MqttConnectUtils {
+
+    /**
+     * Keys of user configuration parameters
+     */
+    public static final String ACCESS_MODE = "access-mode";
+    public static final String ANONYMOUS_ACCESS = "anonymous-alternative";
+    public static final String USERNAME_ACCESS = "username-alternative";
+    public static final String USERNAME = "username";
+    public static final String PASSWORD = "password";
+
+    public static Label getAccessModeLabel() {
+        return Labels.from(ACCESS_MODE, "Access Mode", "");
+    }
+
+    public static Label getBrokerUrlLabel() {
+        return Labels.from("broker_url", "Broker URL",
+                "Example: tcp://test-server.com:1883 (Protocol required. Port required)");
+    }
+
+    public static Label getTopicLabel() {
+        return Labels.from("topic", "Topic","Example: test/topic");
+    }
+
+    public static StaticPropertyAlternative getAlternativesOne() {
+        return Alternatives.from(Labels.from(ANONYMOUS_ACCESS, "Unauthenticated", ""));
+
+    }
+
+    public static StaticPropertyAlternative getAlternativesTwo() {
+        return Alternatives.from(Labels.from(USERNAME_ACCESS, "Username/Password", ""),
+                StaticProperties.group(Labels.from("username-group", "User Group", ""),
+                        StaticProperties.stringFreeTextProperty(Labels.from(USERNAME,
+                                "Username", "")),
+                        StaticProperties.secretValue(Labels.from(PASSWORD,
+                                "Password", ""))));
+
+    }
+
+
+//    public static StaticPropertyAlternative[] getAlternatives() {
+//        StaticPropertyAlternative[] result = {
+//                Alternatives.from(Labels.from(ANONYMOUS_ACCESS, "Unauthenticated", "")),
+//                Alternatives.from(Labels.from(USERNAME_ACCESS, "Username/Password", ""),
+//                        StaticProperties.group(Labels.from("username-group", "User Group", ""),
+//                                StaticProperties.stringFreeTextProperty(Labels.from(USERNAME,
+//                                        "Username", "")),
+//                                StaticProperties.secretValue(Labels.from(PASSWORD,
+//                                        "Password", ""))))
+//        };
+//        return result;
+//
+//    }
+
+    public static MqttConfig getMqttConfig(StaticPropertyExtractor extractor) {
+        return getMqttConfig(extractor, null);
+    }
+
+    public static MqttConfig getMqttConfig(StaticPropertyExtractor extractor, String topicInput) {
+        MqttConfig mqttConfig;
+        String brokerUrl = extractor.singleValueParameter("broker_url", String.class);
+
+        String topic;
+        if (topicInput == null) {
+            topic = extractor.singleValueParameter("topic", String.class);
+        } else {
+            topic = topicInput;
+        }
+
+        String selectedAlternative = extractor.selectedAlternativeInternalId(ACCESS_MODE);
+
+        if (selectedAlternative.equals(ANONYMOUS_ACCESS)) {
+            mqttConfig = new MqttConfig(brokerUrl, topic);
+        } else {
+            String username = extractor.singleValueParameter(USERNAME, String.class);
+            String password = extractor.secretValue(PASSWORD);
+            mqttConfig = new MqttConfig(brokerUrl, topic, username, password);
+        }
+
+        return mqttConfig;
+    }
+
+}
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/resources/org.apache.streampipes.connect.adapters.netio.mqtt/strings.en b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/resources/org.apache.streampipes.connect.adapters.netio.mqtt/strings.en
index e5def51..9873d71 100644
--- a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/resources/org.apache.streampipes.connect.adapters.netio.mqtt/strings.en
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/resources/org.apache.streampipes.connect.adapters.netio.mqtt/strings.en
@@ -1,11 +1,2 @@
 org.apache.streampipes.connect.adapters.netio.mqtt.title=NETIO MQTT
 org.apache.streampipes.connect.adapters.netio.mqtt.description=Connect a NETIO power plugs over MQTT
-
-ROS_HOST_KEY.title=Ros Bridge
-ROS_HOST_KEY.description=Example: test-server.com (No protocol)
-
-ROS_PORT_KEY.title=Port
-ROS_PORT_KEY.description=Example: 9090
-
-TOPIC_KEY.title=Topic
-TOPIC_KEY.description=Example: /battery (Starts with /)
\ No newline at end of file


[incubator-streampipes-extensions] 01/02: Merge branch 'dev' into netio

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch netio
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit b9c296ddd7595634f0f1675bbbccb7a4b7b8436c
Merge: 6254a33 3ed27a4
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Tue Feb 4 09:44:53 2020 +0100

    Merge branch 'dev' into netio

 .../streampipes/connect/adapters/influxdb/InfluxDbSetAdapter.java       | 1 +
 .../streampipes/connect/adapters/influxdb/InfluxDbStreamAdapter.java    | 1 +
 .../org/apache/streampipes/connect/adapters/mysql/MySqlAdapter.java     | 2 +-
 .../org/apache/streampipes/connect/adapters/mysql/MySqlSetAdapter.java  | 1 +
 .../apache/streampipes/connect/adapters/mysql/MySqlStreamAdapter.java   | 1 +
 5 files changed, 5 insertions(+), 1 deletion(-)