You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/02/10 14:00:48 UTC

[plc4x] branch develop updated: - Implemented the functionality to subscribe to individual group addresses and address ranges

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new a22d0da  - Implemented the functionality to subscribe to individual group addresses and address ranges
a22d0da is described below

commit a22d0da9e9e37934f9e78dab09b3687f817c3351
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Feb 10 14:33:37 2020 +0100

    - Implemented the functionality to subscribe to individual group addresses and address ranges
---
 .../plc4x/java/knxnetip/field/KnxNetIpField.java   |  35 ++++-
 .../knxnetip/model/KnxNetIpSubscriptionHandle.java |  78 +++++++++++
 .../knxnetip/protocol/KnxNetIpProtocolLogic.java   |  43 ++++--
 .../apache/plc4x/java/knxnetip/ManualKnxNetIp.java |   2 +-
 .../java/knxnetip/ManualKnxNetIpWithEts5.java      | 149 ---------------------
 5 files changed, 141 insertions(+), 166 deletions(-)

diff --git a/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java b/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java
index 0f768a1..bc864ad 100644
--- a/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java
+++ b/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java
@@ -20,12 +20,14 @@ package org.apache.plc4x.java.knxnetip.field;
 
 import org.apache.plc4x.java.api.exceptions.PlcInvalidFieldException;
 import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.knxnetip.ets5.model.GroupAddress;
 
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public class KnxNetIpField implements PlcField {
 
+    private static final String WILDCARD = "*";
     private static final Pattern KNX_GROUP_ADDRESS_1_LEVEL =
         Pattern.compile("^(?<mainGroup>(\\d{1,5}|\\*))");
     private static final Pattern KNX_GROUP_ADDRESS_2_LEVEL =
@@ -45,17 +47,17 @@ public class KnxNetIpField implements PlcField {
     }
 
     public static KnxNetIpField of(String fieldString) {
-        Matcher matcher = KNX_GROUP_ADDRESS_3_LEVEL.matcher(fieldString);
+        Matcher matcher = KNX_GROUP_ADDRESS_1_LEVEL.matcher(fieldString);
         if(matcher.matches()) {
-            return new KnxNetIpField(3, matcher.group("mainGroup"), null, null);
+            return new KnxNetIpField(1, matcher.group("mainGroup"), null, null);
         }
         matcher = KNX_GROUP_ADDRESS_2_LEVEL.matcher(fieldString);
         if(matcher.matches()) {
             return new KnxNetIpField(2, matcher.group("mainGroup"), null, matcher.group("subGroup"));
         }
-        matcher = KNX_GROUP_ADDRESS_1_LEVEL.matcher(fieldString);
+        matcher = KNX_GROUP_ADDRESS_3_LEVEL.matcher(fieldString);
         if(matcher.matches()) {
-            return new KnxNetIpField(1, matcher.group("mainGroup"), matcher.group("middleGroup"), matcher.group("subGroup"));
+            return new KnxNetIpField(3, matcher.group("mainGroup"), matcher.group("middleGroup"), matcher.group("subGroup"));
         }
         throw new PlcInvalidFieldException("Unable to parse address: " + fieldString);
     }
@@ -83,4 +85,29 @@ public class KnxNetIpField implements PlcField {
         return subGroup;
     }
 
+    // As our fields can contain wildcards and complex matching logic,
+    // do a check if a given GroupAddress is actually compatible with this field.
+    public boolean matchesGroupAddress(GroupAddress groupAddress) {
+        KnxNetIpField otherAddress = KnxNetIpField.of(groupAddress.getGroupAddress());
+        // If the levels don't match the whole address can't match.
+        if(otherAddress.getLevels() != getLevels()) {
+            return false;
+        }
+        // NOTE: This case fallthrough is intentional :-)
+        switch (getLevels()) {
+            case 3:
+                if(!WILDCARD.equals(getMiddleGroup()) && !getMiddleGroup().equals(otherAddress.getMiddleGroup())) {
+                    return false;
+                }
+            case 2:
+                if(!WILDCARD.equals(getSubGroup()) && !getSubGroup().equals(otherAddress.getSubGroup())) {
+                    return false;
+                }
+            case 1:
+                return WILDCARD.equals(getMainGroup()) || getMainGroup().equals(otherAddress.getMainGroup());
+            default:
+                return false;
+        }
+    }
+
 }
diff --git a/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/model/KnxNetIpSubscriptionHandle.java b/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/model/KnxNetIpSubscriptionHandle.java
new file mode 100644
index 0000000..2bf50cc
--- /dev/null
+++ b/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/model/KnxNetIpSubscriptionHandle.java
@@ -0,0 +1,78 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.knxnetip.model;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.plc4x.java.knxnetip.ets5.model.GroupAddress;
+import org.apache.plc4x.java.knxnetip.field.KnxNetIpField;
+import org.apache.plc4x.java.spi.messages.PlcSubscriber;
+import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
+
+public class KnxNetIpSubscriptionHandle extends DefaultPlcSubscriptionHandle {
+
+    private final KnxNetIpField field;
+
+    public KnxNetIpSubscriptionHandle(PlcSubscriber plcSubscriber, KnxNetIpField field) {
+        super(plcSubscriber);
+        this.field = field;
+    }
+
+    public KnxNetIpField getField() {
+        return field;
+    }
+
+    public boolean matches(GroupAddress address) {
+        return field.matchesGroupAddress(address);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (!(o instanceof KnxNetIpSubscriptionHandle)) {
+            return false;
+        }
+
+        KnxNetIpSubscriptionHandle that = (KnxNetIpSubscriptionHandle) o;
+
+        return new EqualsBuilder()
+            .append(getField(), that.getField())
+            .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder(17, 37)
+            .appendSuper(super.hashCode())
+            .append(getField())
+            .toHashCode();
+    }
+
+    @Override
+    public String toString() {
+        return new ToStringBuilder(this)
+            .append("field", field)
+            .toString();
+    }
+
+}
diff --git a/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java b/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
index 82109b9..222d725 100644
--- a/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
+++ b/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
@@ -26,6 +26,7 @@ import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.api.value.PlcString;
@@ -35,6 +36,8 @@ import org.apache.plc4x.java.knxnetip.configuration.KnxNetIpConfiguration;
 import org.apache.plc4x.java.knxnetip.ets5.Ets5Parser;
 import org.apache.plc4x.java.knxnetip.ets5.model.Ets5Model;
 import org.apache.plc4x.java.knxnetip.ets5.model.GroupAddress;
+import org.apache.plc4x.java.knxnetip.field.KnxNetIpField;
+import org.apache.plc4x.java.knxnetip.model.KnxNetIpSubscriptionHandle;
 import org.apache.plc4x.java.knxnetip.readwrite.KNXGroupAddress;
 import org.apache.plc4x.java.knxnetip.readwrite.KNXGroupAddress2Level;
 import org.apache.plc4x.java.knxnetip.readwrite.KNXGroupAddress3Level;
@@ -54,7 +57,6 @@ import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionResponse;
 import org.apache.plc4x.java.spi.messages.InternalPlcSubscriptionRequest;
 import org.apache.plc4x.java.spi.messages.PlcSubscriber;
 import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
-import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
 import org.apache.plc4x.java.spi.model.InternalPlcSubscriptionHandle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,7 +85,7 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
     private byte groupAddressType;
     private Ets5Model ets5Model;
 
-    private Map<Integer, Consumer<PlcSubscriptionEvent>> consumerIdMap = new ConcurrentHashMap<>();
+    private Map<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> consumers = new ConcurrentHashMap<>();
 
     @Override
     public void setConfiguration(KnxNetIpConfiguration configuration) {
@@ -284,7 +286,7 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
                             final PlcStruct dataPoint = new PlcStruct(dataPointMap);
 
                             // Send the data-structure.
-                            publishEvent("knxData", dataPoint);
+                            publishEvent(groupAddress, dataPoint);
                         } else {
                             LOGGER.warn("Message from: '" + toString(sourceAddress) + "'" +
                                 " to unknown group address: '" + destinationAddress + "'" +
@@ -318,7 +320,13 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
     public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
         Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> values = new HashMap<>();
         for (String fieldName : subscriptionRequest.getFieldNames()) {
-            values.put(fieldName, new ImmutablePair<>(PlcResponseCode.OK, new DefaultPlcSubscriptionHandle(this)));
+            final PlcField field = subscriptionRequest.getField(fieldName);
+            if(!(field instanceof KnxNetIpField)) {
+                values.put(fieldName, new ImmutablePair<>(PlcResponseCode.INVALID_ADDRESS, null));
+            } else {
+                values.put(fieldName, new ImmutablePair<>(PlcResponseCode.OK,
+                    new KnxNetIpSubscriptionHandle(this, (KnxNetIpField) field)));
+            }
         }
         return CompletableFuture.completedFuture(
             new DefaultPlcSubscriptionResponse((InternalPlcSubscriptionRequest) subscriptionRequest, values));
@@ -328,24 +336,35 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
     public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> collection) {
         final DefaultPlcConsumerRegistration consumerRegistration =
             new DefaultPlcConsumerRegistration(this, consumer, collection.toArray(new InternalPlcSubscriptionHandle[0]));
-        consumerIdMap.put(consumerRegistration.getConsumerHash(), consumer);
+        consumers.put(consumerRegistration, consumer);
         return consumerRegistration;
     }
 
     @Override
     public void unregister(PlcConsumerRegistration plcConsumerRegistration) {
         DefaultPlcConsumerRegistration consumerRegistration = (DefaultPlcConsumerRegistration) plcConsumerRegistration;
-        consumerIdMap.remove(consumerRegistration.getConsumerHash());
+        consumers.remove(consumerRegistration);
     }
 
-    protected void publishEvent(String name, PlcValue plcValue) {
+    protected void publishEvent(GroupAddress groupAddress, PlcValue plcValue) {
         // Create a subscription event from the input.
         final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(),
-            Collections.singletonMap(name, Pair.of(PlcResponseCode.OK, plcValue)));
-
-        // Send the subscription event to all listeners.
-        for (Consumer<PlcSubscriptionEvent> consumer : consumerIdMap.values()) {
-            consumer.accept(event);
+            Collections.singletonMap("knxData", Pair.of(PlcResponseCode.OK, plcValue)));
+
+        // Try sending the subscription event to all listeners.
+        for (Map.Entry<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> entry : consumers.entrySet()) {
+            final DefaultPlcConsumerRegistration registration = entry.getKey();
+            final Consumer<PlcSubscriptionEvent> consumer = entry.getValue();
+            // Only if the current data point matches the subscription, publish the event to it.
+            for (InternalPlcSubscriptionHandle handle : registration.getAssociatedHandles()) {
+                if(handle instanceof KnxNetIpSubscriptionHandle) {
+                    KnxNetIpSubscriptionHandle subscriptionHandle = (KnxNetIpSubscriptionHandle) handle;
+                    // Check if the subscription matches this current event.
+                    if (subscriptionHandle.getField().matchesGroupAddress(groupAddress)) {
+                        consumer.accept(event);
+                    }
+                }
+            }
         }
     }
 
diff --git a/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java b/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
index c676999..02c4344 100644
--- a/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
+++ b/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
@@ -52,7 +52,7 @@ public class ManualKnxNetIp {
         // The address and the name is just bogus as we're always returning everything.
         // We will probably refactor the API in the near future.
         final PlcSubscriptionRequest subscriptionRequest = connection.subscriptionRequestBuilder()
-            .addEventField("knxData", "*/*/*")
+            .addEventField("knxData", "*/*/10")
             .build();
 
         // Register the subscription
diff --git a/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java b/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java
deleted file mode 100644
index 31e604a..0000000
--- a/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.java.knxnetip;
-
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.knxnetip.readwrite.*;
-import org.apache.plc4x.java.knxnetip.ets5.Ets5Parser;
-import org.apache.plc4x.java.knxnetip.ets5.model.Ets5Model;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.TimeUnit;
-
-public class ManualKnxNetIpWithEts5 {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(ManualKnxNetIpWithEts5.class);
-
-    private final InetAddress gatewayInetAddress;
-    private final Ets5Model ets5Model;
-    private final byte groupAddressType;
-
-    private ManualKnxNetIpWithEts5(String gatewayAddress, String knxprojFilePath) throws UnknownHostException {
-        gatewayInetAddress = InetAddress.getByName(gatewayAddress);
-        ets5Model = new Ets5Parser().parse(new File(knxprojFilePath));
-        groupAddressType = ets5Model.getGroupAddressType();
-    }
-
-    private void start() throws PlcConnectionException {
-        /*
-        ChannelFactory channelFactory = new UdpSocketChannelFactory(
-            gatewayInetAddress, KnxNetIpConnection.KNXNET_IP_PORT);
-
-        NettyPlcConnection connection = new KnxNetIpConnection(channelFactory, "",
-            new PlcMessageToMessageCodec<KNXNetIPMessage, PlcRequestContainer>() {
-                @Override
-                protected void encode(ChannelHandlerContext ctx, PlcRequestContainer msg, List<Object> out) throws Exception {
-                    // Ignore for now ...
-                }
-
-                @Override
-                protected void decode(ChannelHandlerContext ctx, KNXNetIPMessage packet, List<Object> out) throws Exception {
-                    if(packet instanceof TunnelingRequest) {
-                        TunnelingRequest request = (TunnelingRequest) packet;
-                        CEMI cemiPayload = request.getCemi();
-                        if(cemiPayload instanceof CEMIBusmonInd) {
-                            CEMIBusmonInd cemiBusmonInd = (CEMIBusmonInd) cemiPayload;
-                            if(cemiBusmonInd.getCemiFrame() instanceof CEMIFrameData) {
-                                CEMIFrameData cemiDataFrame = (CEMIFrameData) cemiBusmonInd.getCemiFrame();
-
-                                // The first byte is actually just 6 bit long, but we'll treat it as a full one.
-                                // So here we create a byte array containing the first and all the following bytes.
-                                byte[] payload = new byte[1 + cemiDataFrame.getData().length];
-                                payload[0] = cemiDataFrame.getDataFirstByte();
-                                System.arraycopy(cemiDataFrame.getData(), 0, payload, 1, cemiDataFrame.getData().length);
-
-                                final KNXAddress sourceAddress = cemiDataFrame.getSourceAddress();
-                                final byte[] destinationGroupAddress = cemiDataFrame.getDestinationAddress();
-
-                                ReadBuffer addressReadBuffer = new ReadBuffer(destinationGroupAddress);
-                                // Decode the group address depending on the project settings.
-                                KNXGroupAddress destinationAddress =
-                                    KNXGroupAddressIO.staticParse(addressReadBuffer, groupAddressType);
-                                final GroupAddress groupAddress = ets5Model.getGroupAddresses().get(destinationAddress);
-
-                                ReadBuffer rawDataReader = new ReadBuffer(payload);
-
-                                final KnxDatapoint datapoint = KnxDatapointIO.staticParse(rawDataReader, groupAddress.getType().getMainType(), groupAddress.getType().getSubType());
-                                final String jsonDatapoint = datapoint.toString(ToStringStyle.JSON_STYLE);
-
-                                if("Isttemperatur".equals(groupAddress.getName())) {
-                                    LOGGER.info("Message from: '" + ManualKnxNetIpWithEts5.toString(sourceAddress) + "'" +
-                                        " to: '" + ManualKnxNetIpWithEts5.toString(destinationAddress) + "'" +
-                                        "\n location: '" + groupAddress.getFunction().getSpaceName() + "'" +
-                                        " function: '" + groupAddress.getFunction().getName() + "'" +
-                                        " meaning: '" + groupAddress.getName() + "'" +
-                                        " type: '" + groupAddress.getType().getName() + "'" +
-                                        "\n value: '" + jsonDatapoint + "'"
-                                    );
-                                }
-                            } else if (cemiBusmonInd.getCemiFrame() instanceof CEMIFrameAck){
-                                // Just ignore this ...
-                            } else {
-                                System.out.println(packet);
-                            }
-                        } else {
-                            System.out.println(packet);
-                        }
-                    } else {
-                        System.out.println(packet);
-                    }
-                }
-            });
-
-        connection.connect();
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            try {
-                connection.close();
-            } catch (PlcConnectionException e) {
-                // Just ignore this.
-            }
-        }));
-        */
-    }
-
-    protected static String toString(KNXAddress knxAddress) {
-        return knxAddress.getMainGroup() + "." + knxAddress.getMiddleGroup() + "." + knxAddress.getSubGroup();
-    }
-
-    protected static String toString(KNXGroupAddress groupAddress) {
-        if(groupAddress instanceof KNXGroupAddress3Level) {
-            KNXGroupAddress3Level level3 = (KNXGroupAddress3Level) groupAddress;
-            return level3.getMainGroup() + "/" + level3.getMiddleGroup() + "/" + level3.getSubGroup();
-        } else if(groupAddress instanceof KNXGroupAddress2Level) {
-            KNXGroupAddress2Level level2 = (KNXGroupAddress2Level) groupAddress;
-            return level2.getMainGroup() + "/" + level2.getSubGroup();
-        } else if(groupAddress instanceof KNXGroupAddressFreeLevel) {
-            KNXGroupAddressFreeLevel free = (KNXGroupAddressFreeLevel) groupAddress;
-            return free.getSubGroup() + "";
-        }
-        throw new RuntimeException("Unsupported Group Address Type " + groupAddress.getClass().getName());
-    }
-
-    public static void main(String[] args) throws Exception {
-        ManualKnxNetIpWithEts5 connection = new ManualKnxNetIpWithEts5("192.168.42.11",
-            "/Users/christofer.dutz/Projects/Apache/PLC4X-Documents/KNX/Stettiner Str. 13/StettinerStr-Soll-Ist-Temperatur.knxproj");
-        connection.start();
-        TimeUnit.SECONDS.sleep(3000);
-    }
-
-}