You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/06/11 16:46:16 UTC

[incubator-streampipes-extensions] branch feature/adapter-flic-button created (now 915d24e)

This is an automated email from the ASF dual-hosted git repository.

fjohn pushed a change to branch feature/adapter-flic-button
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git.


      at 915d24e  New Adapter for Flic Button

This branch includes the following new commits:

     new 915d24e  New Adapter for Flic Button

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-streampipes-extensions] 01/01: New Adapter for Flic Button

Posted by fj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fjohn pushed a commit to branch feature/adapter-flic-button
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 915d24ee70c9efa8bc0d17fcaee53aba59c988a1
Author: Felix John <jo...@axantu.com>
AuthorDate: Thu Jun 11 16:57:18 2020 +0200

    New Adapter for Flic Button
---
 .../streampipes/connect/ConnectAdapterInit.java    |   4 +-
 .../connect/adapters/flic/FlicMQTTAdapter.java     | 143 +++++++++++++++++++++
 .../connect/adapters/flic/FlicOutput.java          |  45 +++++++
 .../connect/adapters/flic/FlicUtils.java           |  78 +++++++++++
 4 files changed, 269 insertions(+), 1 deletion(-)

diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
index d347247..5a020ba 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.connect;
 import org.apache.streampipes.connect.adapters.image.set.ImageSetAdapter;
 import org.apache.streampipes.connect.adapters.image.stream.ImageStreamAdapter;
 import org.apache.streampipes.connect.adapters.iss.IssAdapter;
+import org.apache.streampipes.connect.adapters.flic.FlicMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.modbus.Plc4xModbusAdapter;
@@ -92,7 +93,8 @@ public class ConnectAdapterInit extends AdapterWorkerContainer {
             .add(new Plc4xModbusAdapter())
             .add(new ImageStreamAdapter())
             .add(new ImageSetAdapter())
-            .add(new IssAdapter());
+            .add(new IssAdapter())
+            .add(new FlicMQTTAdapter());
 
     String workerUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerWorkerUrl();
     String masterUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerMasterUrl();
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java
new file mode 100644
index 0000000..d577fcd
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.adapters.flic;
+
+import com.google.gson.Gson;
+import org.apache.streampipes.connect.utils.MqttConnectUtils;
+import org.apache.streampipes.connect.adapter.Adapter;
+import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
+import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
+import org.apache.streampipes.connect.protocol.stream.MqttConfig;
+import org.apache.streampipes.connect.protocol.stream.MqttConsumer;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.AdapterType;
+import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+
+import java.util.*;
+
+public class FlicMQTTAdapter extends SpecificDataStreamAdapter {
+
+    private MqttConsumer mqttConsumer;
+    private MqttConfig mqttConfig;
+    private Thread thread;
+
+    /**
+     * A unique id to identify the adapter type
+     */
+    public static final String ID = "org.apache.streampipes.connect.adapters.flic.mqtt";
+
+    /**
+     * Empty constructor and a constructor with SpecificAdapterStreamDescription are mandatory
+     */
+    public FlicMQTTAdapter() {
+    }
+
+    public FlicMQTTAdapter(SpecificAdapterStreamDescription adapterDescription) {
+        super(adapterDescription);
+    }
+
+
+    /**
+     * Describe the adapter adapter and define what user inputs are required.
+     * @return
+     */
+    @Override
+    public SpecificAdapterStreamDescription declareModel() {
+
+        SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID)
+                .withLocales(Locales.EN)
+                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+                .category(AdapterType.Energy)
+                .requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
+                .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), MqttConnectUtils.getAlternativesOne(), MqttConnectUtils.getAlternativesTwo())
+                .build();
+        description.setAppId(ID);
+
+        return description;
+    }
+
+    /**
+     * Takes the user input and creates the event schema. The event schema describes the properties of the event stream.
+     * @param adapterDescription
+     * @return
+     * @throws AdapterException
+     */
+    @Override
+    public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException {
+        return FlicUtils.getFlicSchema();
+    }
+    @Override
+    public void startAdapter() throws AdapterException {
+        StaticPropertyExtractor extractor =
+                StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
+
+        this.mqttConfig = MqttConnectUtils.getMqttConfig(extractor, "devices/flic/messages/events/");
+        this.mqttConsumer = new MqttConsumer(this.mqttConfig, new EventProcessor(adapterPipeline));
+
+        thread = new Thread(this.mqttConsumer);
+        thread.start();
+    }
+
+    @Override
+    public void stopAdapter() throws AdapterException {
+        this.mqttConsumer.close();
+    }
+
+    /**
+     * Required by StreamPipes return a new adapter instance by calling the constructor with SpecificAdapterStreamDescription
+     * @param adapterDescription
+     * @return
+     */
+    @Override
+    public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
+        return new FlicMQTTAdapter(adapterDescription);
+    }
+
+
+    /**
+     * Required by StreamPipes. Return the id of the adapter
+     * @return
+     */
+    @Override
+    public String getId() {
+        return ID;
+    }
+
+    private class EventProcessor implements InternalEventProcessor<byte[]> {
+        private AdapterPipeline adapterPipeline;
+
+        public EventProcessor(AdapterPipeline adapterpipeline) {
+            this.adapterPipeline = adapterpipeline;
+        }
+
+        @Override
+        public void onEvent(byte[] payload) {
+            String s = new String(payload);
+            FlicOutput output = new Gson().fromJson(s, FlicOutput.class);
+            Map<String, Object> event = FlicUtils.getEvent(output);
+            adapterPipeline.process(event);
+        }
+    }
+}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicOutput.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicOutput.java
new file mode 100644
index 0000000..317c33b
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicOutput.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.adapters.flic;
+
+import com.google.gson.annotations.SerializedName;
+import javax.annotation.Generated;
+
+@Generated("net.hexar.json2pojo")
+public class FlicOutput {
+
+    @SerializedName("ButtonID")
+    private String buttonID;
+
+    @SerializedName("ClickType")
+    private String clickType;
+
+    public String getButtonID() { return buttonID; }
+
+    public void setButtonID(String buttonID) { this.buttonID = buttonID; }
+
+    public String getClickType() {
+        return clickType;
+    }
+
+    public void setClickType(float voltage) {
+        this.clickType = clickType;
+    }
+
+}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicUtils.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicUtils.java
new file mode 100644
index 0000000..afd45a1
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.adapters.flic;
+
+import org.apache.streampipes.connect.adapters.netio.model.NetioGlobalMeasure;
+import org.apache.streampipes.connect.adapters.netio.model.NetioPowerOutput;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.apache.streampipes.sdk.utils.Datatypes;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FlicUtils {
+    public static final String TIMESTAMP_KEY = "timestamp";
+    public static final String BUTTON_ID_KEY = "button_id";
+    public static final String CLICK_TYPE_KEY = "click_type";
+
+    public static GuessSchema getFlicSchema() {
+
+        GuessSchema guessSchema = new GuessSchema();
+
+        EventSchema eventSchema = new EventSchema();
+        List<EventProperty> allProperties = new ArrayList<>();
+
+        allProperties.add(EpProperties.timestampProperty(TIMESTAMP_KEY));
+        allProperties.add(
+                PrimitivePropertyBuilder
+                        .create(Datatypes.String, BUTTON_ID_KEY)
+                        .label("Button ID")
+                        .description("The ID of the button")
+                        .build());
+        allProperties.add(
+                PrimitivePropertyBuilder
+                        .create(Datatypes.String, CLICK_TYPE_KEY)
+                        .label("Click Type")
+                        .description("Type of the click")
+                        .build());
+
+        eventSchema.setEventProperties(allProperties);
+        guessSchema.setEventSchema(eventSchema);
+        guessSchema.setPropertyProbabilityList(new ArrayList<>());
+        return guessSchema;
+    }
+
+    public static Map<String, Object> getEvent(FlicOutput output) {
+        Map<String, Object> event = new HashMap<>();
+
+        event.put(TIMESTAMP_KEY, System.currentTimeMillis());
+        event.put(BUTTON_ID_KEY, output.getButtonID());
+        event.put(CLICK_TYPE_KEY, output.getClickType());
+        return event;
+    }
+
+}