You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/06/11 14:57:40 UTC

[incubator-streampipes-extensions] 01/01: New Adapter for Flic Button

This is an automated email from the ASF dual-hosted git repository.

fjohn pushed a commit to branch flic-button
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 915d24ee70c9efa8bc0d17fcaee53aba59c988a1
Author: Felix John <jo...@axantu.com>
AuthorDate: Thu Jun 11 16:57:18 2020 +0200

    New Adapter for Flic Button
---
 .../streampipes/connect/ConnectAdapterInit.java    |   4 +-
 .../connect/adapters/flic/FlicMQTTAdapter.java     | 143 +++++++++++++++++++++
 .../connect/adapters/flic/FlicOutput.java          |  45 +++++++
 .../connect/adapters/flic/FlicUtils.java           |  78 +++++++++++
 4 files changed, 269 insertions(+), 1 deletion(-)

diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
index d347247..5a020ba 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.connect;
 import org.apache.streampipes.connect.adapters.image.set.ImageSetAdapter;
 import org.apache.streampipes.connect.adapters.image.stream.ImageStreamAdapter;
 import org.apache.streampipes.connect.adapters.iss.IssAdapter;
+import org.apache.streampipes.connect.adapters.flic.FlicMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.modbus.Plc4xModbusAdapter;
@@ -92,7 +93,8 @@ public class ConnectAdapterInit extends AdapterWorkerContainer {
             .add(new Plc4xModbusAdapter())
             .add(new ImageStreamAdapter())
             .add(new ImageSetAdapter())
-            .add(new IssAdapter());
+            .add(new IssAdapter())
+            .add(new FlicMQTTAdapter());
 
     String workerUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerWorkerUrl();
     String masterUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerMasterUrl();
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java
new file mode 100644
index 0000000..d577fcd
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.adapters.flic;
+
+import com.google.gson.Gson;
+import org.apache.streampipes.connect.utils.MqttConnectUtils;
+import org.apache.streampipes.connect.adapter.Adapter;
+import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
+import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
+import org.apache.streampipes.connect.protocol.stream.MqttConfig;
+import org.apache.streampipes.connect.protocol.stream.MqttConsumer;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.AdapterType;
+import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+
+import java.util.*;
+
+public class FlicMQTTAdapter extends SpecificDataStreamAdapter {
+
+    private MqttConsumer mqttConsumer;
+    private MqttConfig mqttConfig;
+    private Thread thread;
+
+    /**
+     * A unique id to identify the adapter type
+     */
+    public static final String ID = "org.apache.streampipes.connect.adapters.flic.mqtt";
+
+    /**
+     * Empty constructor and a constructor with SpecificAdapterStreamDescription are mandatory
+     */
+    public FlicMQTTAdapter() {
+    }
+
+    public FlicMQTTAdapter(SpecificAdapterStreamDescription adapterDescription) {
+        super(adapterDescription);
+    }
+
+
+    /**
+     * Describe the adapter adapter and define what user inputs are required.
+     * @return
+     */
+    @Override
+    public SpecificAdapterStreamDescription declareModel() {
+
+        SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID)
+                .withLocales(Locales.EN)
+                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+                .category(AdapterType.Energy)
+                .requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
+                .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), MqttConnectUtils.getAlternativesOne(), MqttConnectUtils.getAlternativesTwo())
+                .build();
+        description.setAppId(ID);
+
+        return description;
+    }
+
+    /**
+     * Takes the user input and creates the event schema. The event schema describes the properties of the event stream.
+     * @param adapterDescription
+     * @return
+     * @throws AdapterException
+     */
+    @Override
+    public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException {
+        return FlicUtils.getFlicSchema();
+    }
+    @Override
+    public void startAdapter() throws AdapterException {
+        StaticPropertyExtractor extractor =
+                StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
+
+        this.mqttConfig = MqttConnectUtils.getMqttConfig(extractor, "devices/flic/messages/events/");
+        this.mqttConsumer = new MqttConsumer(this.mqttConfig, new EventProcessor(adapterPipeline));
+
+        thread = new Thread(this.mqttConsumer);
+        thread.start();
+    }
+
+    @Override
+    public void stopAdapter() throws AdapterException {
+        this.mqttConsumer.close();
+    }
+
+    /**
+     * Required by StreamPipes return a new adapter instance by calling the constructor with SpecificAdapterStreamDescription
+     * @param adapterDescription
+     * @return
+     */
+    @Override
+    public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
+        return new FlicMQTTAdapter(adapterDescription);
+    }
+
+
+    /**
+     * Required by StreamPipes. Return the id of the adapter
+     * @return
+     */
+    @Override
+    public String getId() {
+        return ID;
+    }
+
+    private class EventProcessor implements InternalEventProcessor<byte[]> {
+        private AdapterPipeline adapterPipeline;
+
+        public EventProcessor(AdapterPipeline adapterpipeline) {
+            this.adapterPipeline = adapterpipeline;
+        }
+
+        @Override
+        public void onEvent(byte[] payload) {
+            String s = new String(payload);
+            FlicOutput output = new Gson().fromJson(s, FlicOutput.class);
+            Map<String, Object> event = FlicUtils.getEvent(output);
+            adapterPipeline.process(event);
+        }
+    }
+}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicOutput.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicOutput.java
new file mode 100644
index 0000000..317c33b
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicOutput.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.adapters.flic;
+
+import com.google.gson.annotations.SerializedName;
+import javax.annotation.Generated;
+
+@Generated("net.hexar.json2pojo")
+public class FlicOutput {
+
+    @SerializedName("ButtonID")
+    private String buttonID;
+
+    @SerializedName("ClickType")
+    private String clickType;
+
+    public String getButtonID() { return buttonID; }
+
+    public void setButtonID(String buttonID) { this.buttonID = buttonID; }
+
+    public String getClickType() {
+        return clickType;
+    }
+
+    public void setClickType(float voltage) {
+        this.clickType = clickType;
+    }
+
+}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicUtils.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicUtils.java
new file mode 100644
index 0000000..afd45a1
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.adapters.flic;
+
+import org.apache.streampipes.connect.adapters.netio.model.NetioGlobalMeasure;
+import org.apache.streampipes.connect.adapters.netio.model.NetioPowerOutput;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.apache.streampipes.sdk.utils.Datatypes;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FlicUtils {
+    public static final String TIMESTAMP_KEY = "timestamp";
+    public static final String BUTTON_ID_KEY = "button_id";
+    public static final String CLICK_TYPE_KEY = "click_type";
+
+    public static GuessSchema getFlicSchema() {
+
+        GuessSchema guessSchema = new GuessSchema();
+
+        EventSchema eventSchema = new EventSchema();
+        List<EventProperty> allProperties = new ArrayList<>();
+
+        allProperties.add(EpProperties.timestampProperty(TIMESTAMP_KEY));
+        allProperties.add(
+                PrimitivePropertyBuilder
+                        .create(Datatypes.String, BUTTON_ID_KEY)
+                        .label("Button ID")
+                        .description("The ID of the button")
+                        .build());
+        allProperties.add(
+                PrimitivePropertyBuilder
+                        .create(Datatypes.String, CLICK_TYPE_KEY)
+                        .label("Click Type")
+                        .description("Type of the click")
+                        .build());
+
+        eventSchema.setEventProperties(allProperties);
+        guessSchema.setEventSchema(eventSchema);
+        guessSchema.setPropertyProbabilityList(new ArrayList<>());
+        return guessSchema;
+    }
+
+    public static Map<String, Object> getEvent(FlicOutput output) {
+        Map<String, Object> event = new HashMap<>();
+
+        event.put(TIMESTAMP_KEY, System.currentTimeMillis());
+        event.put(BUTTON_ID_KEY, output.getButtonID());
+        event.put(CLICK_TYPE_KEY, output.getClickType());
+        return event;
+    }
+
+}