You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/02/10 14:00:48 UTC
[plc4x] branch develop updated: - Implemented the functionality to
subscribe to individual group addresses and address ranges
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new a22d0da - Implemented the functionality to subscribe to individual group addresses and address ranges
a22d0da is described below
commit a22d0da9e9e37934f9e78dab09b3687f817c3351
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Feb 10 14:33:37 2020 +0100
- Implemented the functionality to subscribe to individual group addresses and address ranges
---
.../plc4x/java/knxnetip/field/KnxNetIpField.java | 35 ++++-
.../knxnetip/model/KnxNetIpSubscriptionHandle.java | 78 +++++++++++
.../knxnetip/protocol/KnxNetIpProtocolLogic.java | 43 ++++--
.../apache/plc4x/java/knxnetip/ManualKnxNetIp.java | 2 +-
.../java/knxnetip/ManualKnxNetIpWithEts5.java | 149 ---------------------
5 files changed, 141 insertions(+), 166 deletions(-)
diff --git a/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java b/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java
index 0f768a1..bc864ad 100644
--- a/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java
+++ b/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/field/KnxNetIpField.java
@@ -20,12 +20,14 @@ package org.apache.plc4x.java.knxnetip.field;
import org.apache.plc4x.java.api.exceptions.PlcInvalidFieldException;
import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.knxnetip.ets5.model.GroupAddress;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class KnxNetIpField implements PlcField {
+ private static final String WILDCARD = "*";
private static final Pattern KNX_GROUP_ADDRESS_1_LEVEL =
Pattern.compile("^(?<mainGroup>(\\d{1,5}|\\*))");
private static final Pattern KNX_GROUP_ADDRESS_2_LEVEL =
@@ -45,17 +47,17 @@ public class KnxNetIpField implements PlcField {
}
public static KnxNetIpField of(String fieldString) {
- Matcher matcher = KNX_GROUP_ADDRESS_3_LEVEL.matcher(fieldString);
+ Matcher matcher = KNX_GROUP_ADDRESS_1_LEVEL.matcher(fieldString);
if(matcher.matches()) {
- return new KnxNetIpField(3, matcher.group("mainGroup"), null, null);
+ return new KnxNetIpField(1, matcher.group("mainGroup"), null, null);
}
matcher = KNX_GROUP_ADDRESS_2_LEVEL.matcher(fieldString);
if(matcher.matches()) {
return new KnxNetIpField(2, matcher.group("mainGroup"), null, matcher.group("subGroup"));
}
- matcher = KNX_GROUP_ADDRESS_1_LEVEL.matcher(fieldString);
+ matcher = KNX_GROUP_ADDRESS_3_LEVEL.matcher(fieldString);
if(matcher.matches()) {
- return new KnxNetIpField(1, matcher.group("mainGroup"), matcher.group("middleGroup"), matcher.group("subGroup"));
+ return new KnxNetIpField(3, matcher.group("mainGroup"), matcher.group("middleGroup"), matcher.group("subGroup"));
}
throw new PlcInvalidFieldException("Unable to parse address: " + fieldString);
}
@@ -83,4 +85,29 @@ public class KnxNetIpField implements PlcField {
return subGroup;
}
+ // As our fields can contain wildcards and complex matching logic,
+ // do a check if a given GroupAddress is actually compatible with this field.
+ public boolean matchesGroupAddress(GroupAddress groupAddress) {
+ KnxNetIpField otherAddress = KnxNetIpField.of(groupAddress.getGroupAddress());
+ // If the levels don't match the whole address can't match.
+ if(otherAddress.getLevels() != getLevels()) {
+ return false;
+ }
+ // NOTE: This case fallthrough is intentional :-)
+ switch (getLevels()) {
+ case 3:
+ if(!WILDCARD.equals(getMiddleGroup()) && !getMiddleGroup().equals(otherAddress.getMiddleGroup())) {
+ return false;
+ }
+ case 2:
+ if(!WILDCARD.equals(getSubGroup()) && !getSubGroup().equals(otherAddress.getSubGroup())) {
+ return false;
+ }
+ case 1:
+ return WILDCARD.equals(getMainGroup()) || getMainGroup().equals(otherAddress.getMainGroup());
+ default:
+ return false;
+ }
+ }
+
}
diff --git a/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/model/KnxNetIpSubscriptionHandle.java b/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/model/KnxNetIpSubscriptionHandle.java
new file mode 100644
index 0000000..2bf50cc
--- /dev/null
+++ b/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/model/KnxNetIpSubscriptionHandle.java
@@ -0,0 +1,78 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.knxnetip.model;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.plc4x.java.knxnetip.ets5.model.GroupAddress;
+import org.apache.plc4x.java.knxnetip.field.KnxNetIpField;
+import org.apache.plc4x.java.spi.messages.PlcSubscriber;
+import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
+
+public class KnxNetIpSubscriptionHandle extends DefaultPlcSubscriptionHandle {
+
+ private final KnxNetIpField field;
+
+ public KnxNetIpSubscriptionHandle(PlcSubscriber plcSubscriber, KnxNetIpField field) {
+ super(plcSubscriber);
+ this.field = field;
+ }
+
+ public KnxNetIpField getField() {
+ return field;
+ }
+
+ public boolean matches(GroupAddress address) {
+ return field.matchesGroupAddress(address);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof KnxNetIpSubscriptionHandle)) {
+ return false;
+ }
+
+ KnxNetIpSubscriptionHandle that = (KnxNetIpSubscriptionHandle) o;
+
+ return new EqualsBuilder()
+ .append(getField(), that.getField())
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .appendSuper(super.hashCode())
+ .append(getField())
+ .toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("field", field)
+ .toString();
+ }
+
+}
diff --git a/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java b/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
index 82109b9..222d725 100644
--- a/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
+++ b/plc4j/drivers/knxnetip/src/main/java/org/apache/plc4x/java/knxnetip/protocol/KnxNetIpProtocolLogic.java
@@ -26,6 +26,7 @@ import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.value.PlcString;
@@ -35,6 +36,8 @@ import org.apache.plc4x.java.knxnetip.configuration.KnxNetIpConfiguration;
import org.apache.plc4x.java.knxnetip.ets5.Ets5Parser;
import org.apache.plc4x.java.knxnetip.ets5.model.Ets5Model;
import org.apache.plc4x.java.knxnetip.ets5.model.GroupAddress;
+import org.apache.plc4x.java.knxnetip.field.KnxNetIpField;
+import org.apache.plc4x.java.knxnetip.model.KnxNetIpSubscriptionHandle;
import org.apache.plc4x.java.knxnetip.readwrite.KNXGroupAddress;
import org.apache.plc4x.java.knxnetip.readwrite.KNXGroupAddress2Level;
import org.apache.plc4x.java.knxnetip.readwrite.KNXGroupAddress3Level;
@@ -54,7 +57,6 @@ import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionResponse;
import org.apache.plc4x.java.spi.messages.InternalPlcSubscriptionRequest;
import org.apache.plc4x.java.spi.messages.PlcSubscriber;
import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
-import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
import org.apache.plc4x.java.spi.model.InternalPlcSubscriptionHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +85,7 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
private byte groupAddressType;
private Ets5Model ets5Model;
- private Map<Integer, Consumer<PlcSubscriptionEvent>> consumerIdMap = new ConcurrentHashMap<>();
+ private Map<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> consumers = new ConcurrentHashMap<>();
@Override
public void setConfiguration(KnxNetIpConfiguration configuration) {
@@ -284,7 +286,7 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
final PlcStruct dataPoint = new PlcStruct(dataPointMap);
// Send the data-structure.
- publishEvent("knxData", dataPoint);
+ publishEvent(groupAddress, dataPoint);
} else {
LOGGER.warn("Message from: '" + toString(sourceAddress) + "'" +
" to unknown group address: '" + destinationAddress + "'" +
@@ -318,7 +320,13 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> values = new HashMap<>();
for (String fieldName : subscriptionRequest.getFieldNames()) {
- values.put(fieldName, new ImmutablePair<>(PlcResponseCode.OK, new DefaultPlcSubscriptionHandle(this)));
+ final PlcField field = subscriptionRequest.getField(fieldName);
+ if(!(field instanceof KnxNetIpField)) {
+ values.put(fieldName, new ImmutablePair<>(PlcResponseCode.INVALID_ADDRESS, null));
+ } else {
+ values.put(fieldName, new ImmutablePair<>(PlcResponseCode.OK,
+ new KnxNetIpSubscriptionHandle(this, (KnxNetIpField) field)));
+ }
}
return CompletableFuture.completedFuture(
new DefaultPlcSubscriptionResponse((InternalPlcSubscriptionRequest) subscriptionRequest, values));
@@ -328,24 +336,35 @@ public class KnxNetIpProtocolLogic extends Plc4xProtocolBase<KNXNetIPMessage> im
public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> collection) {
final DefaultPlcConsumerRegistration consumerRegistration =
new DefaultPlcConsumerRegistration(this, consumer, collection.toArray(new InternalPlcSubscriptionHandle[0]));
- consumerIdMap.put(consumerRegistration.getConsumerHash(), consumer);
+ consumers.put(consumerRegistration, consumer);
return consumerRegistration;
}
@Override
public void unregister(PlcConsumerRegistration plcConsumerRegistration) {
DefaultPlcConsumerRegistration consumerRegistration = (DefaultPlcConsumerRegistration) plcConsumerRegistration;
- consumerIdMap.remove(consumerRegistration.getConsumerHash());
+ consumers.remove(consumerRegistration);
}
- protected void publishEvent(String name, PlcValue plcValue) {
+ protected void publishEvent(GroupAddress groupAddress, PlcValue plcValue) {
// Create a subscription event from the input.
final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(),
- Collections.singletonMap(name, Pair.of(PlcResponseCode.OK, plcValue)));
-
- // Send the subscription event to all listeners.
- for (Consumer<PlcSubscriptionEvent> consumer : consumerIdMap.values()) {
- consumer.accept(event);
+ Collections.singletonMap("knxData", Pair.of(PlcResponseCode.OK, plcValue)));
+
+ // Try sending the subscription event to all listeners.
+ for (Map.Entry<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> entry : consumers.entrySet()) {
+ final DefaultPlcConsumerRegistration registration = entry.getKey();
+ final Consumer<PlcSubscriptionEvent> consumer = entry.getValue();
+ // Only if the current data point matches the subscription, publish the event to it.
+ for (InternalPlcSubscriptionHandle handle : registration.getAssociatedHandles()) {
+ if(handle instanceof KnxNetIpSubscriptionHandle) {
+ KnxNetIpSubscriptionHandle subscriptionHandle = (KnxNetIpSubscriptionHandle) handle;
+ // Check if the subscription matches this current event.
+ if (subscriptionHandle.getField().matchesGroupAddress(groupAddress)) {
+ consumer.accept(event);
+ }
+ }
+ }
}
}
diff --git a/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java b/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
index c676999..02c4344 100644
--- a/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
+++ b/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIp.java
@@ -52,7 +52,7 @@ public class ManualKnxNetIp {
// The address and the name is just bogus as we're always returning everything.
// We will probably refactor the API in the near future.
final PlcSubscriptionRequest subscriptionRequest = connection.subscriptionRequestBuilder()
- .addEventField("knxData", "*/*/*")
+ .addEventField("knxData", "*/*/10")
.build();
// Register the subscription
diff --git a/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java b/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java
deleted file mode 100644
index 31e604a..0000000
--- a/plc4j/drivers/knxnetip/src/test/java/org/apache/plc4x/java/knxnetip/ManualKnxNetIpWithEts5.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.java.knxnetip;
-
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.knxnetip.readwrite.*;
-import org.apache.plc4x.java.knxnetip.ets5.Ets5Parser;
-import org.apache.plc4x.java.knxnetip.ets5.model.Ets5Model;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.TimeUnit;
-
-public class ManualKnxNetIpWithEts5 {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ManualKnxNetIpWithEts5.class);
-
- private final InetAddress gatewayInetAddress;
- private final Ets5Model ets5Model;
- private final byte groupAddressType;
-
- private ManualKnxNetIpWithEts5(String gatewayAddress, String knxprojFilePath) throws UnknownHostException {
- gatewayInetAddress = InetAddress.getByName(gatewayAddress);
- ets5Model = new Ets5Parser().parse(new File(knxprojFilePath));
- groupAddressType = ets5Model.getGroupAddressType();
- }
-
- private void start() throws PlcConnectionException {
- /*
- ChannelFactory channelFactory = new UdpSocketChannelFactory(
- gatewayInetAddress, KnxNetIpConnection.KNXNET_IP_PORT);
-
- NettyPlcConnection connection = new KnxNetIpConnection(channelFactory, "",
- new PlcMessageToMessageCodec<KNXNetIPMessage, PlcRequestContainer>() {
- @Override
- protected void encode(ChannelHandlerContext ctx, PlcRequestContainer msg, List<Object> out) throws Exception {
- // Ignore for now ...
- }
-
- @Override
- protected void decode(ChannelHandlerContext ctx, KNXNetIPMessage packet, List<Object> out) throws Exception {
- if(packet instanceof TunnelingRequest) {
- TunnelingRequest request = (TunnelingRequest) packet;
- CEMI cemiPayload = request.getCemi();
- if(cemiPayload instanceof CEMIBusmonInd) {
- CEMIBusmonInd cemiBusmonInd = (CEMIBusmonInd) cemiPayload;
- if(cemiBusmonInd.getCemiFrame() instanceof CEMIFrameData) {
- CEMIFrameData cemiDataFrame = (CEMIFrameData) cemiBusmonInd.getCemiFrame();
-
- // The first byte is actually just 6 bit long, but we'll treat it as a full one.
- // So here we create a byte array containing the first and all the following bytes.
- byte[] payload = new byte[1 + cemiDataFrame.getData().length];
- payload[0] = cemiDataFrame.getDataFirstByte();
- System.arraycopy(cemiDataFrame.getData(), 0, payload, 1, cemiDataFrame.getData().length);
-
- final KNXAddress sourceAddress = cemiDataFrame.getSourceAddress();
- final byte[] destinationGroupAddress = cemiDataFrame.getDestinationAddress();
-
- ReadBuffer addressReadBuffer = new ReadBuffer(destinationGroupAddress);
- // Decode the group address depending on the project settings.
- KNXGroupAddress destinationAddress =
- KNXGroupAddressIO.staticParse(addressReadBuffer, groupAddressType);
- final GroupAddress groupAddress = ets5Model.getGroupAddresses().get(destinationAddress);
-
- ReadBuffer rawDataReader = new ReadBuffer(payload);
-
- final KnxDatapoint datapoint = KnxDatapointIO.staticParse(rawDataReader, groupAddress.getType().getMainType(), groupAddress.getType().getSubType());
- final String jsonDatapoint = datapoint.toString(ToStringStyle.JSON_STYLE);
-
- if("Isttemperatur".equals(groupAddress.getName())) {
- LOGGER.info("Message from: '" + ManualKnxNetIpWithEts5.toString(sourceAddress) + "'" +
- " to: '" + ManualKnxNetIpWithEts5.toString(destinationAddress) + "'" +
- "\n location: '" + groupAddress.getFunction().getSpaceName() + "'" +
- " function: '" + groupAddress.getFunction().getName() + "'" +
- " meaning: '" + groupAddress.getName() + "'" +
- " type: '" + groupAddress.getType().getName() + "'" +
- "\n value: '" + jsonDatapoint + "'"
- );
- }
- } else if (cemiBusmonInd.getCemiFrame() instanceof CEMIFrameAck){
- // Just ignore this ...
- } else {
- System.out.println(packet);
- }
- } else {
- System.out.println(packet);
- }
- } else {
- System.out.println(packet);
- }
- }
- });
-
- connection.connect();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- connection.close();
- } catch (PlcConnectionException e) {
- // Just ignore this.
- }
- }));
- */
- }
-
- protected static String toString(KNXAddress knxAddress) {
- return knxAddress.getMainGroup() + "." + knxAddress.getMiddleGroup() + "." + knxAddress.getSubGroup();
- }
-
- protected static String toString(KNXGroupAddress groupAddress) {
- if(groupAddress instanceof KNXGroupAddress3Level) {
- KNXGroupAddress3Level level3 = (KNXGroupAddress3Level) groupAddress;
- return level3.getMainGroup() + "/" + level3.getMiddleGroup() + "/" + level3.getSubGroup();
- } else if(groupAddress instanceof KNXGroupAddress2Level) {
- KNXGroupAddress2Level level2 = (KNXGroupAddress2Level) groupAddress;
- return level2.getMainGroup() + "/" + level2.getSubGroup();
- } else if(groupAddress instanceof KNXGroupAddressFreeLevel) {
- KNXGroupAddressFreeLevel free = (KNXGroupAddressFreeLevel) groupAddress;
- return free.getSubGroup() + "";
- }
- throw new RuntimeException("Unsupported Group Address Type " + groupAddress.getClass().getName());
- }
-
- public static void main(String[] args) throws Exception {
- ManualKnxNetIpWithEts5 connection = new ManualKnxNetIpWithEts5("192.168.42.11",
- "/Users/christofer.dutz/Projects/Apache/PLC4X-Documents/KNX/Stettiner Str. 13/StettinerStr-Soll-Ist-Temperatur.knxproj");
- connection.start();
- TimeUnit.SECONDS.sleep(3000);
- }
-
-}