You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/01/20 16:31:03 UTC

[plc4x] branch develop updated: - Switched the KNX Driver to support real subscriptions.

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7761653  - Switched the KNX Driver to support real subscriptions.
7761653 is described below

commit 77616530112876879d3b15c1a3085b16ea7f3160
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Jan 20 17:30:55 2020 +0100

    - Switched the KNX Driver to support real subscriptions.
---
 .../plc4x/java/knxnetip/field/KnxNetIpField.java   |  6 +-
 .../knxnetip/protocol/KnxNetIpProtocolLogic.java   | 95 +++++++++++++++++++---
 .../apache/plc4x/java/knxnetip/ManualKnxNetIp.java | 37 ++++++++-
 3 files changed, 121 insertions(+), 17 deletions(-)

diff --git a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java
index c88bfa4..0f768a1 100644
--- a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java
+++ b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java
@@ -47,15 +47,15 @@ public class KnxNetIpField implements PlcField {
     public static KnxNetIpField of(String fieldString) {
         Matcher matcher = KNX_GROUP_ADDRESS_3_LEVEL.matcher(fieldString);
         if(matcher.matches()) {
-            return new KnxNetIpField(3, matcher.group("main"), null, null);
+            return new KnxNetIpField(3, matcher.group("mainGroup"), null, null);
         }
         matcher = KNX_GROUP_ADDRESS_2_LEVEL.matcher(fieldString);
         if(matcher.matches()) {
-            return new KnxNetIpField(2, matcher.group("main"), null, matcher.group("subGroup"));
+            return new KnxNetIpField(2, matcher.group("mainGroup"), null, matcher.group("subGroup"));
         }
         matcher = KNX_GROUP_ADDRESS_1_LEVEL.matcher(fieldString);
         if(matcher.matches()) {
-            return new KnxNetIpField(1, matcher.group("main"), matcher.group("middleGroup"), matcher.group("subGroup"));
+            return new KnxNetIpField(1, matcher.group("mainGroup"), matcher.group("middleGroup"), matcher.group("subGroup"));
         }
         throw new PlcInvalidFieldException("Unable to parse address: " + fieldString);
     }
diff --git a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
index c78cc06..62456bc 100644
--- a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
+++ b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
@@ -20,6 +20,16 @@ package org.apache.plc4x.java.knxnetip.protocol;
 
 import io.netty.channel.socket.DatagramChannel;
 import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.api.value.PlcString;
+import org.apache.plc4x.java.api.value.PlcStruct;
 import org.apache.plc4x.java.api.value.PlcValue;
 import org.apache.plc4x.java.knxnetip.configuration.KnxNetIpConfiguration;
 import org.apache.plc4x.java.knxnetip.ets5.Ets5Parser;
@@ -38,15 +48,26 @@ import org.apache.plc4x.java.knxnetip.readwrite.types.KnxLayer;
 import org.apache.plc4x.java.knxnetip.readwrite.types.Status;
 import org.apache.plc4x.java.spi.configuration.HasConfiguration;
 import org.apache.plc4x.java.spi.generation.ReadBuffer;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionResponse;
+import org.apache.plc4x.java.spi.messages.InternalPlcSubscriptionRequest;
+import org.apache.plc4x.java.spi.messages.PlcSubscriber;
+import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
+import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
+import org.apache.plc4x.java.spi.model.InternalPlcSubscriptionHandle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 
-public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> implements HasConfiguration<KnxNetIpConfiguration> {
+public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> implements HasConfiguration<KnxNetIpConfiguration>, PlcSubscriber {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(KnxNetIpProtocolLogic.class);
 
@@ -61,6 +82,8 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
     private byte groupAddressType;
     private Ets5Model ets5Model;
 
+    private Map<Integer, Consumer<PlcSubscriptionEvent>> consumerIdMap = new ConcurrentHashMap<>();
+
     @Override
     public void setConfiguration(KnxNetIpConfiguration configuration) {
         if (configuration.knxprojFilePath != null) {
@@ -233,19 +256,31 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
                         final GroupAddress groupAddress = ets5Model.getGroupAddresses().get(destinationAddress);
 
                         if(groupAddress != null) {
+                            LOGGER.info("Message from: '" + toString(sourceAddress) +
+                                "' to: '" + destinationAddress + "'");
+
+                            // Parse the payload depending on the type of the group-address.
                             ReadBuffer rawDataReader = new ReadBuffer(payload);
-                            final PlcValue datapoint = KnxDatapointIO.parse(rawDataReader, groupAddress.getType().getMainType(), groupAddress.getType().getSubType());
-
-                            LOGGER.info("Message from: '" + KnxNetIpProtocolLogic.toString(sourceAddress) + "'" +
-                                " to: '" + destinationAddress + "'" +
-                                "\n location: '" + groupAddress.getFunction().getSpaceName() + "'" +
-                                " function: '" + groupAddress.getFunction().getName() + "'" +
-                                " meaning: '" + groupAddress.getName() + "'" +
-                                " type: '" + groupAddress.getType().getName() + "'" +
-                                "\n value: '" + datapoint + "'"
-                            );
+                            final PlcValue value = KnxDatapointIO.parse(rawDataReader,
+                                groupAddress.getType().getMainType(), groupAddress.getType().getSubType());
+
+                            // Assemble the plc4x return data-structure.
+                            Map<String, PlcValue> dataPointMap = new HashMap<>();
+                            dataPointMap.put("sourceAddress", new PlcString(toString(sourceAddress)));
+                            dataPointMap.put("targetAddress", new PlcString(groupAddress.getGroupAddress()));
+                            dataPointMap.put("name", new PlcString(groupAddress.getName()));
+                            dataPointMap.put("type", new PlcString(groupAddress.getType().getName()));
+                            dataPointMap.put("functionId", new PlcString(groupAddress.getFunction().getId()));
+                            dataPointMap.put("functionName", new PlcString(groupAddress.getFunction().getName()));
+                            dataPointMap.put("functionType", new PlcString(groupAddress.getFunction().getType()));
+                            dataPointMap.put("functionSpace", new PlcString(groupAddress.getFunction().getSpaceName()));
+                            dataPointMap.put("value", value);
+                            final PlcStruct dataPoint = new PlcStruct(dataPointMap);
+
+                            // Send the data-structure.
+                            publishEvent("knxData", dataPoint);
                         } else {
-                            LOGGER.warn("Message from: '" + KnxNetIpProtocolLogic.toString(sourceAddress) + "'" +
+                            LOGGER.warn("Message from: '" + toString(sourceAddress) + "'" +
                                 " to unknown group address: '" + destinationAddress + "'" +
                                 "\n payload: '" + Hex.encodeHexString(payload) + "'");
                         }
@@ -273,6 +308,41 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
         // TODO Implement Closing on Protocol Level
     }
 
+    @Override
+    public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+        Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> values = new HashMap<>();
+        for (String fieldName : subscriptionRequest.getFieldNames()) {
+            values.put(fieldName, new ImmutablePair<>(PlcResponseCode.OK, new DefaultPlcSubscriptionHandle(this)));
+        }
+        return CompletableFuture.completedFuture(
+            new DefaultPlcSubscriptionResponse((InternalPlcSubscriptionRequest) subscriptionRequest, values));
+    }
+
+    @Override
+    public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> collection) {
+        final DefaultPlcConsumerRegistration consumerRegistration =
+            new DefaultPlcConsumerRegistration(this, consumer, collection.toArray(new InternalPlcSubscriptionHandle[0]));
+        consumerIdMap.put(consumerRegistration.getConsumerHash(), consumer);
+        return consumerRegistration;
+    }
+
+    @Override
+    public void unregister(PlcConsumerRegistration plcConsumerRegistration) {
+        DefaultPlcConsumerRegistration consumerRegistration = (DefaultPlcConsumerRegistration) plcConsumerRegistration;
+        consumerIdMap.remove(consumerRegistration.getConsumerHash());
+    }
+
+    protected void publishEvent(String name, PlcValue plcValue) {
+        // Create a subscription event from the input.
+        final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(),
+            Collections.singletonMap(name, Pair.of(PlcResponseCode.OK, plcValue)));
+
+        // Send the subscription event to all listeners.
+        for (Consumer<PlcSubscriptionEvent> consumer : consumerIdMap.values()) {
+            consumer.accept(event);
+        }
+    }
+
     protected static String toString(KNXAddress knxAddress) {
         return knxAddress.getMainGroup() + "." + knxAddress.getMiddleGroup() + "." + knxAddress.getSubGroup();
     }
@@ -290,4 +360,5 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
         }
         throw new RuntimeException("Unsupported Group Address Type " + groupAddress.getClass().getName());
     }
+
 }
diff --git a/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java b/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
index 8df8d33..165cc02 100644
--- a/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
+++ b/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
@@ -20,15 +20,48 @@ package org.apache.plc4x.java.knxnetip;
 
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+import org.apache.plc4x.java.knxnetip.field.KnxNetIpField;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
 
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 public class ManualKnxNetIp {
 
     public static void main(String[] args) throws Exception {
         final PlcConnection connection = new PlcDriverManager().getConnection("knxnet-ip://192.168.42.11?knxproj-file-path=/Users/christofer.dutz/Projects/Apache/PLC4X-Documents/KNX/Stettiner%20Str.%2013/StettinerStr-Soll-Ist-Temperatur.knxproj");
-        TimeUnit.SECONDS.sleep(300);
-        connection.close();
+        // Make sure we hang up correctly when terminating.
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+                connection.close();
+            } catch (Exception  e) {
+                throw new PlcRuntimeException("Error closing connection", e);
+            }
+        }));
+
+        // Create a new subscription request.
+        // The address and the name is just bogus as we're always returning everything.
+        // We will probably refactor the API in the near future.
+        final PlcSubscriptionRequest subscriptionRequest = connection.subscriptionRequestBuilder()
+            .addEventField("knxData", "*/*/*")
+            .build();
+
+        // Register the subscription
+        // The timeout is also just a bogus value as the data is coming in actively
+        // We will probably refactor the API in the near future.
+        final PlcSubscriptionResponse subscriptionResponse =
+            subscriptionRequest.execute().get(1000, TimeUnit.MILLISECONDS);
+
+        // Register a callback which is called on new data being available.
+        final PlcSubscriptionHandle subscriptionHandle = subscriptionResponse.getSubscriptionHandle("knxData");
+        subscriptionHandle.register(knxData -> {
+            System.out.println(knxData.getTimestamp().toString() + " - " +
+                ((DefaultPlcSubscriptionEvent) knxData).getValues().get("knxData"));
+        });
     }
 
 }