You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/01/20 16:31:03 UTC
[plc4x] branch develop updated: - Switched the KNX Driver to
support real subscriptions.
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 7761653 - Switched the KNX Driver to support real subscriptions.
7761653 is described below
commit 77616530112876879d3b15c1a3085b16ea7f3160
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Jan 20 17:30:55 2020 +0100
- Switched the KNX Driver to support real subscriptions.
---
.../plc4x/java/knxnetip/field/KnxNetIpField.java | 6 +-
.../knxnetip/protocol/KnxNetIpProtocolLogic.java | 95 +++++++++++++++++++---
.../apache/plc4x/java/knxnetip/ManualKnxNetIp.java | 37 ++++++++-
3 files changed, 121 insertions(+), 17 deletions(-)
diff --git a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java
index c88bfa4..0f768a1 100644
--- a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java
+++ b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java
@@ -47,15 +47,15 @@ public class KnxNetIpField implements PlcField {
public static KnxNetIpField of(String fieldString) {
Matcher matcher = KNX_GROUP_ADDRESS_3_LEVEL.matcher(fieldString);
if(matcher.matches()) {
- return new KnxNetIpField(3, matcher.group("main"), null, null);
+ return new KnxNetIpField(3, matcher.group("mainGroup"), null, null);
}
matcher = KNX_GROUP_ADDRESS_2_LEVEL.matcher(fieldString);
if(matcher.matches()) {
- return new KnxNetIpField(2, matcher.group("main"), null, matcher.group("subGroup"));
+ return new KnxNetIpField(2, matcher.group("mainGroup"), null, matcher.group("subGroup"));
}
matcher = KNX_GROUP_ADDRESS_1_LEVEL.matcher(fieldString);
if(matcher.matches()) {
- return new KnxNetIpField(1, matcher.group("main"), matcher.group("middleGroup"), matcher.group("subGroup"));
+ return new KnxNetIpField(1, matcher.group("mainGroup"), matcher.group("middleGroup"), matcher.group("subGroup"));
}
throw new PlcInvalidFieldException("Unable to parse address: " + fieldString);
}
diff --git a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
index c78cc06..62456bc 100644
--- a/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
+++ b/sandbox/test-java-knxnetip-driver/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
@@ -20,6 +20,16 @@ package org.apache.plc4x.java.knxnetip.protocol;
import io.netty.channel.socket.DatagramChannel;
import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.api.value.PlcString;
+import org.apache.plc4x.java.api.value.PlcStruct;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.knxnetip.configuration.KnxNetIpConfiguration;
import org.apache.plc4x.java.knxnetip.ets5.Ets5Parser;
@@ -38,15 +48,26 @@ import org.apache.plc4x.java.knxnetip.readwrite.types.KnxLayer;
import org.apache.plc4x.java.knxnetip.readwrite.types.Status;
import org.apache.plc4x.java.spi.configuration.HasConfiguration;
import org.apache.plc4x.java.spi.generation.ReadBuffer;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionResponse;
+import org.apache.plc4x.java.spi.messages.InternalPlcSubscriptionRequest;
+import org.apache.plc4x.java.spi.messages.PlcSubscriber;
+import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
+import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
+import org.apache.plc4x.java.spi.model.InternalPlcSubscriptionHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.InetSocketAddress;
import java.time.Duration;
+import java.time.Instant;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
-public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> implements HasConfiguration<KnxNetIpConfiguration> {
+public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> implements HasConfiguration<KnxNetIpConfiguration>, PlcSubscriber {
private static final Logger LOGGER = LoggerFactory.getLogger(KnxNetIpProtocolLogic.class);
@@ -61,6 +82,8 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
private byte groupAddressType;
private Ets5Model ets5Model;
+ private Map<Integer, Consumer<PlcSubscriptionEvent>> consumerIdMap = new ConcurrentHashMap<>();
+
@Override
public void setConfiguration(KnxNetIpConfiguration configuration) {
if (configuration.knxprojFilePath != null) {
@@ -233,19 +256,31 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
final GroupAddress groupAddress = ets5Model.getGroupAddresses().get(destinationAddress);
if(groupAddress != null) {
+ LOGGER.info("Message from: '" + toString(sourceAddress) +
+ "' to: '" + destinationAddress + "'");
+
+ // Parse the payload depending on the type of the group-address.
ReadBuffer rawDataReader = new ReadBuffer(payload);
- final PlcValue datapoint = KnxDatapointIO.parse(rawDataReader, groupAddress.getType().getMainType(), groupAddress.getType().getSubType());
-
- LOGGER.info("Message from: '" + KnxNetIpProtocolLogic.toString(sourceAddress) + "'" +
- " to: '" + destinationAddress + "'" +
- "\n location: '" + groupAddress.getFunction().getSpaceName() + "'" +
- " function: '" + groupAddress.getFunction().getName() + "'" +
- " meaning: '" + groupAddress.getName() + "'" +
- " type: '" + groupAddress.getType().getName() + "'" +
- "\n value: '" + datapoint + "'"
- );
+ final PlcValue value = KnxDatapointIO.parse(rawDataReader,
+ groupAddress.getType().getMainType(), groupAddress.getType().getSubType());
+
+ // Assemble the plc4x return data-structure.
+ Map<String, PlcValue> dataPointMap = new HashMap<>();
+ dataPointMap.put("sourceAddress", new PlcString(toString(sourceAddress)));
+ dataPointMap.put("targetAddress", new PlcString(groupAddress.getGroupAddress()));
+ dataPointMap.put("name", new PlcString(groupAddress.getName()));
+ dataPointMap.put("type", new PlcString(groupAddress.getType().getName()));
+ dataPointMap.put("functionId", new PlcString(groupAddress.getFunction().getId()));
+ dataPointMap.put("functionName", new PlcString(groupAddress.getFunction().getName()));
+ dataPointMap.put("functionType", new PlcString(groupAddress.getFunction().getType()));
+ dataPointMap.put("functionSpace", new PlcString(groupAddress.getFunction().getSpaceName()));
+ dataPointMap.put("value", value);
+ final PlcStruct dataPoint = new PlcStruct(dataPointMap);
+
+ // Send the data-structure.
+ publishEvent("knxData", dataPoint);
} else {
- LOGGER.warn("Message from: '" + KnxNetIpProtocolLogic.toString(sourceAddress) + "'" +
+ LOGGER.warn("Message from: '" + toString(sourceAddress) + "'" +
" to unknown group address: '" + destinationAddress + "'" +
"\n payload: '" + Hex.encodeHexString(payload) + "'");
}
@@ -273,6 +308,41 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
// TODO Implement Closing on Protocol Level
}
+ @Override
+ public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+ Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> values = new HashMap<>();
+ for (String fieldName : subscriptionRequest.getFieldNames()) {
+ values.put(fieldName, new ImmutablePair<>(PlcResponseCode.OK, new DefaultPlcSubscriptionHandle(this)));
+ }
+ return CompletableFuture.completedFuture(
+ new DefaultPlcSubscriptionResponse((InternalPlcSubscriptionRequest) subscriptionRequest, values));
+ }
+
+ @Override
+ public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> collection) {
+ final DefaultPlcConsumerRegistration consumerRegistration =
+ new DefaultPlcConsumerRegistration(this, consumer, collection.toArray(new InternalPlcSubscriptionHandle[0]));
+ consumerIdMap.put(consumerRegistration.getConsumerHash(), consumer);
+ return consumerRegistration;
+ }
+
+ @Override
+ public void unregister(PlcConsumerRegistration plcConsumerRegistration) {
+ DefaultPlcConsumerRegistration consumerRegistration = (DefaultPlcConsumerRegistration) plcConsumerRegistration;
+ consumerIdMap.remove(consumerRegistration.getConsumerHash());
+ }
+
+ protected void publishEvent(String name, PlcValue plcValue) {
+ // Create a subscription event from the input.
+ final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(),
+ Collections.singletonMap(name, Pair.of(PlcResponseCode.OK, plcValue)));
+
+ // Send the subscription event to all listeners.
+ for (Consumer<PlcSubscriptionEvent> consumer : consumerIdMap.values()) {
+ consumer.accept(event);
+ }
+ }
+
protected static String toString(KNXAddress knxAddress) {
return knxAddress.getMainGroup() + "." + knxAddress.getMiddleGroup() + "." + knxAddress.getSubGroup();
}
@@ -290,4 +360,5 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
}
throw new RuntimeException("Unsupported Group Address Type " + groupAddress.getClass().getName());
}
+
}
diff --git a/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java b/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
index 8df8d33..165cc02 100644
--- a/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
+++ b/sandbox/test-java-knxnetip-driver/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
@@ -20,15 +20,48 @@ package org.apache.plc4x.java.knxnetip;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+import org.apache.plc4x.java.knxnetip.field.KnxNetIpField;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
+import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class ManualKnxNetIp {
public static void main(String[] args) throws Exception {
final PlcConnection connection = new PlcDriverManager().getConnection("knxnet-ip://192.168.42.11?knxproj-file-path=/Users/christofer.dutz/Projects/Apache/PLC4X-Documents/KNX/Stettiner%20Str.%2013/StettinerStr-Soll-Ist-Temperatur.knxproj");
- TimeUnit.SECONDS.sleep(300);
- connection.close();
+ // Make sure we hang up correctly when terminating.
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ connection.close();
+ } catch (Exception e) {
+ throw new PlcRuntimeException("Error closing connection", e);
+ }
+ }));
+
+ // Create a new subscription request.
+ // The address and the name is just bogus as we're always returning everything.
+ // We will probably refactor the API in the near future.
+ final PlcSubscriptionRequest subscriptionRequest = connection.subscriptionRequestBuilder()
+ .addEventField("knxData", "*/*/*")
+ .build();
+
+ // Register the subscription
+ // The timeout is also just a bogus value as the data is coming in actively
+ // We will probably refactor the API in the near future.
+ final PlcSubscriptionResponse subscriptionResponse =
+ subscriptionRequest.execute().get(1000, TimeUnit.MILLISECONDS);
+
+ // Register a callback which is called on new data being available.
+ final PlcSubscriptionHandle subscriptionHandle = subscriptionResponse.getSubscriptionHandle("knxData");
+ subscriptionHandle.register(knxData -> {
+ System.out.println(knxData.getTimestamp().toString() + " - " +
+ ((DefaultPlcSubscriptionEvent) knxData).getValues().get("knxData"));
+ });
}
}