You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/06/11 14:57:40 UTC
[incubator-streampipes-extensions] 01/01: New Adapter for Flic
Button
This is an automated email from the ASF dual-hosted git repository.
fjohn pushed a commit to branch flic-button
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit 915d24ee70c9efa8bc0d17fcaee53aba59c988a1
Author: Felix John <jo...@axantu.com>
AuthorDate: Thu Jun 11 16:57:18 2020 +0200
New Adapter for Flic Button
---
.../streampipes/connect/ConnectAdapterInit.java | 4 +-
.../connect/adapters/flic/FlicMQTTAdapter.java | 143 +++++++++++++++++++++
.../connect/adapters/flic/FlicOutput.java | 45 +++++++
.../connect/adapters/flic/FlicUtils.java | 78 +++++++++++
4 files changed, 269 insertions(+), 1 deletion(-)
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
index d347247..5a020ba 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.connect;
import org.apache.streampipes.connect.adapters.image.set.ImageSetAdapter;
import org.apache.streampipes.connect.adapters.image.stream.ImageStreamAdapter;
import org.apache.streampipes.connect.adapters.iss.IssAdapter;
+import org.apache.streampipes.connect.adapters.flic.FlicMQTTAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
import org.apache.streampipes.connect.adapters.plc4x.modbus.Plc4xModbusAdapter;
@@ -92,7 +93,8 @@ public class ConnectAdapterInit extends AdapterWorkerContainer {
.add(new Plc4xModbusAdapter())
.add(new ImageStreamAdapter())
.add(new ImageSetAdapter())
- .add(new IssAdapter());
+ .add(new IssAdapter())
+ .add(new FlicMQTTAdapter());
String workerUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerWorkerUrl();
String masterUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerMasterUrl();
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java
new file mode 100644
index 0000000..d577fcd
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.adapters.flic;
+
+import com.google.gson.Gson;
+import org.apache.streampipes.connect.utils.MqttConnectUtils;
+import org.apache.streampipes.connect.adapter.Adapter;
+import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
+import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
+import org.apache.streampipes.connect.protocol.stream.MqttConfig;
+import org.apache.streampipes.connect.protocol.stream.MqttConsumer;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.AdapterType;
+import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+
+import java.util.*;
+
+public class FlicMQTTAdapter extends SpecificDataStreamAdapter {
+
+ private MqttConsumer mqttConsumer;
+ private MqttConfig mqttConfig;
+ private Thread thread;
+
+ /**
+ * A unique id to identify the adapter type
+ */
+ public static final String ID = "org.apache.streampipes.connect.adapters.flic.mqtt";
+
+ /**
+ * Empty constructor and a constructor with SpecificAdapterStreamDescription are mandatory
+ */
+ public FlicMQTTAdapter() {
+ }
+
+ public FlicMQTTAdapter(SpecificAdapterStreamDescription adapterDescription) {
+ super(adapterDescription);
+ }
+
+
+ /**
+ * Describe the adapter adapter and define what user inputs are required.
+ * @return
+ */
+ @Override
+ public SpecificAdapterStreamDescription declareModel() {
+
+ SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID)
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .category(AdapterType.Energy)
+ .requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
+ .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), MqttConnectUtils.getAlternativesOne(), MqttConnectUtils.getAlternativesTwo())
+ .build();
+ description.setAppId(ID);
+
+ return description;
+ }
+
+ /**
+ * Takes the user input and creates the event schema. The event schema describes the properties of the event stream.
+ * @param adapterDescription
+ * @return
+ * @throws AdapterException
+ */
+ @Override
+ public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException {
+ return FlicUtils.getFlicSchema();
+ }
+ @Override
+ public void startAdapter() throws AdapterException {
+ StaticPropertyExtractor extractor =
+ StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
+
+ this.mqttConfig = MqttConnectUtils.getMqttConfig(extractor, "devices/flic/messages/events/");
+ this.mqttConsumer = new MqttConsumer(this.mqttConfig, new EventProcessor(adapterPipeline));
+
+ thread = new Thread(this.mqttConsumer);
+ thread.start();
+ }
+
+ @Override
+ public void stopAdapter() throws AdapterException {
+ this.mqttConsumer.close();
+ }
+
+ /**
+ * Required by StreamPipes return a new adapter instance by calling the constructor with SpecificAdapterStreamDescription
+ * @param adapterDescription
+ * @return
+ */
+ @Override
+ public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
+ return new FlicMQTTAdapter(adapterDescription);
+ }
+
+
+ /**
+ * Required by StreamPipes. Return the id of the adapter
+ * @return
+ */
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ private class EventProcessor implements InternalEventProcessor<byte[]> {
+ private AdapterPipeline adapterPipeline;
+
+ public EventProcessor(AdapterPipeline adapterpipeline) {
+ this.adapterPipeline = adapterpipeline;
+ }
+
+ @Override
+ public void onEvent(byte[] payload) {
+ String s = new String(payload);
+ FlicOutput output = new Gson().fromJson(s, FlicOutput.class);
+ Map<String, Object> event = FlicUtils.getEvent(output);
+ adapterPipeline.process(event);
+ }
+ }
+}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicOutput.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicOutput.java
new file mode 100644
index 0000000..317c33b
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicOutput.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.adapters.flic;
+
+import com.google.gson.annotations.SerializedName;
+import javax.annotation.Generated;
+
+@Generated("net.hexar.json2pojo")
+public class FlicOutput {
+
+ @SerializedName("ButtonID")
+ private String buttonID;
+
+ @SerializedName("ClickType")
+ private String clickType;
+
+ public String getButtonID() { return buttonID; }
+
+ public void setButtonID(String buttonID) { this.buttonID = buttonID; }
+
+ public String getClickType() {
+ return clickType;
+ }
+
+ public void setClickType(float voltage) {
+ this.clickType = clickType;
+ }
+
+}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicUtils.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicUtils.java
new file mode 100644
index 0000000..afd45a1
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.adapters.flic;
+
+import org.apache.streampipes.connect.adapters.netio.model.NetioGlobalMeasure;
+import org.apache.streampipes.connect.adapters.netio.model.NetioPowerOutput;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.apache.streampipes.sdk.utils.Datatypes;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FlicUtils {
+ public static final String TIMESTAMP_KEY = "timestamp";
+ public static final String BUTTON_ID_KEY = "button_id";
+ public static final String CLICK_TYPE_KEY = "click_type";
+
+ public static GuessSchema getFlicSchema() {
+
+ GuessSchema guessSchema = new GuessSchema();
+
+ EventSchema eventSchema = new EventSchema();
+ List<EventProperty> allProperties = new ArrayList<>();
+
+ allProperties.add(EpProperties.timestampProperty(TIMESTAMP_KEY));
+ allProperties.add(
+ PrimitivePropertyBuilder
+ .create(Datatypes.String, BUTTON_ID_KEY)
+ .label("Button ID")
+ .description("The ID of the button")
+ .build());
+ allProperties.add(
+ PrimitivePropertyBuilder
+ .create(Datatypes.String, CLICK_TYPE_KEY)
+ .label("Click Type")
+ .description("Type of the click")
+ .build());
+
+ eventSchema.setEventProperties(allProperties);
+ guessSchema.setEventSchema(eventSchema);
+ guessSchema.setPropertyProbabilityList(new ArrayList<>());
+ return guessSchema;
+ }
+
+ public static Map<String, Object> getEvent(FlicOutput output) {
+ Map<String, Object> event = new HashMap<>();
+
+ event.put(TIMESTAMP_KEY, System.currentTimeMillis());
+ event.put(BUTTON_ID_KEY, output.getButtonID());
+ event.put(CLICK_TYPE_KEY, output.getClickType());
+ return event;
+ }
+
+}