You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by ld...@apache.org on 2020/06/24 08:25:20 UTC

[plc4x] 01/03: Basic test for bacnet live I/O.

This is an automated email from the ASF dual-hosted git repository.

ldywicki pushed a commit to branch feature/bacnet-active
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 053c24ed0dc6c13d463a0d116ba6636762ea580b
Author: Ɓukasz Dywicki <lu...@code-house.org>
AuthorDate: Wed Jun 17 13:43:40 2020 +0200

    Basic test for bacnet live I/O.
---
 .../resources/protocols/bacnetip/bacnetip.mspec    |  17 +-
 ...etIpDriver.java => AbstractBacNetIpDriver.java} |  38 +---
 .../plc4x/java/bacnetip/ActiveBacNetIpDriver.java  |  65 ++++++
 .../plc4x/java/bacnetip/PassiveBacNetIpDriver.java |  49 +----
 .../configuration/BacNetIpConfiguration.java       |  43 ++++
 .../bacnetip/protocol/BacNetIpProtocolLogic.java   | 237 +++++++++++++++++++++
 .../services/org.apache.plc4x.java.api.PlcDriver   |   1 +
 .../plc4x/java/bacnetip/BacNetDriverMain.java      |  50 +++++
 8 files changed, 412 insertions(+), 88 deletions(-)

diff --git a/protocols/bacnetip/src/main/resources/protocols/bacnetip/bacnetip.mspec b/protocols/bacnetip/src/main/resources/protocols/bacnetip/bacnetip.mspec
index eb001e9..150cb1d 100644
--- a/protocols/bacnetip/src/main/resources/protocols/bacnetip/bacnetip.mspec
+++ b/protocols/bacnetip/src/main/resources/protocols/bacnetip/bacnetip.mspec
@@ -316,20 +316,20 @@
         ['0x07' BACnetUnconfirmedServiceRequestWhoHas
             [const uint 8 'deviceInstanceLowLimitHeader' '0x0B']
             [simple uint 24 'deviceInstanceLowLimit']
+
             [const uint 8 'deviceInstanceHighLimitHeader' '0x1B']
             [simple uint 24 'deviceInstanceHighLimit']
+
             [const uint 8 'objectNameHeader' '0x3D']
             [implicit uint 8 'objectNameLength' 'COUNT(objectName) + 1']
+
             [simple uint 8 'objectNameCharacterSet']
             [array int 8 'objectName' length 'objectNameLength - 1']
         ]
         ['0x08' BACnetUnconfirmedServiceRequestWhoIs
-            [const uint 5 'deviceInstanceRangeLowLimitHeader' '0x01']
-            [simple uint 3 'deviceInstanceRangeLowLimitLength']
-            [array int 8 'deviceInstanceRangeLowLimit' count 'deviceInstanceRangeLowLimitLength']
-            [const uint 5 'deviceInstanceRangeHighLimitHeader' '0x03']
-            [simple uint 3 'deviceInstanceRangeHighLimitLength']
-            [array int 8 'deviceInstanceRangeHighLimit' count 'deviceInstanceRangeHighLimitLength']
+            [optional uint 5 'header'  'len > 12']
+            [optional BACnetDeviceInstanceRange 'low' 'header != null && == 0x01']
+            [optional BACnetDeviceInstanceRange 'high' 'header != null && == 0x03']
         ]
         ['0x09' BACnetUnconfirmedServiceRequestUTCTimeSynchronization
         ]
@@ -340,6 +340,11 @@
     ]
 ]
 
+[type 'BACnetDeviceInstanceRange'
+    [simple uint 3 'deviceInstanceRangeLowLimitLength']
+    [array int 8 'deviceInstanceRangeLowLimit' count 'deviceInstanceRangeLowLimitLength']
+]
+
 [discriminatedType 'BACnetServiceAck'
     [discriminator   uint 8 'serviceChoice']
     [typeSwitch 'serviceChoice'
diff --git a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriver.java b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/AbstractBacNetIpDriver.java
similarity index 76%
copy from sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriver.java
copy to sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/AbstractBacNetIpDriver.java
index a4c472c..4932716 100644
--- a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriver.java
+++ b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/AbstractBacNetIpDriver.java
@@ -18,7 +18,8 @@ under the License.
 */
 package org.apache.plc4x.java.bacnetip;
 
-import io.netty.buffer.ByteBuf;
+import java.util.function.Consumer;
+import java.util.function.ToIntFunction;
 import org.apache.plc4x.java.api.PlcDriver;
 import org.apache.plc4x.java.bacnetip.configuration.PassiveBacNetIpConfiguration;
 import org.apache.plc4x.java.bacnetip.field.BacNetIpFieldHandler;
@@ -31,11 +32,9 @@ import org.apache.plc4x.java.spi.connection.ProtocolStackConfigurer;
 import org.apache.plc4x.java.spi.connection.SingleProtocolStackConfigurer;
 import org.osgi.service.component.annotations.Component;
 
-import java.util.function.Consumer;
-import java.util.function.ToIntFunction;
+import io.netty.buffer.ByteBuf;
 
-@Component(service = PlcDriver.class, immediate = true)
-public class PassiveBacNetIpDriver extends GeneratedDriverBase<BVLC> {
+public abstract class AbstractBacNetIpDriver extends GeneratedDriverBase<BVLC> {
 
     public static final int BACNET_IP_PORT = 47808;
 
@@ -50,44 +49,15 @@ public class PassiveBacNetIpDriver extends GeneratedDriverBase<BVLC> {
     }
 
     @Override
-    protected Class<? extends Configuration> getConfigurationType() {
-        return PassiveBacNetIpConfiguration.class;
-    }
-
-    @Override
     protected String getDefaultTransport() {
         return "udp";
     }
 
     @Override
-    protected boolean canRead() {
-        return false;
-    }
-
-    @Override
-    protected boolean canWrite() {
-        return false;
-    }
-
-    @Override
-    protected boolean canSubscribe() {
-        return true;
-    }
-
-    @Override
     protected BacNetIpFieldHandler getFieldHandler() {
         return new BacNetIpFieldHandler();
     }
 
-    @Override
-    protected ProtocolStackConfigurer<BVLC> getStackConfigurer() {
-        return SingleProtocolStackConfigurer.builder(BVLC.class, BVLCIO.class)
-            .withProtocol(PassiveBacNetIpProtocolLogic.class)
-            .withPacketSizeEstimator(ByteLengthEstimator.class)
-            .withCorruptPacketRemover(CorruptPackageCleaner.class)
-            .build();
-    }
-
     /** Estimate the Length of a Packet */
     public static class ByteLengthEstimator implements ToIntFunction<ByteBuf> {
         @Override
diff --git a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/ActiveBacNetIpDriver.java b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/ActiveBacNetIpDriver.java
new file mode 100644
index 0000000..d0d72fc
--- /dev/null
+++ b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/ActiveBacNetIpDriver.java
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.bacnetip;
+
+import org.apache.plc4x.java.bacnetip.configuration.BacNetIpConfiguration;
+import org.apache.plc4x.java.bacnetip.protocol.BacNetIpProtocolLogic;
+import org.apache.plc4x.java.bacnetip.readwrite.BVLC;
+import org.apache.plc4x.java.bacnetip.readwrite.io.BVLCIO;
+import org.apache.plc4x.java.spi.configuration.Configuration;
+import org.apache.plc4x.java.spi.connection.ProtocolStackConfigurer;
+import org.apache.plc4x.java.spi.connection.SingleProtocolStackConfigurer;
+
+public class ActiveBacNetIpDriver extends AbstractBacNetIpDriver {
+
+    @Override
+    protected Class<? extends Configuration> getConfigurationType() {
+        return BacNetIpConfiguration.class;
+    }
+
+    @Override
+    protected boolean canRead() {
+        return true;
+    }
+
+    @Override
+    protected boolean canWrite() {
+        return true;
+    }
+
+    @Override
+    protected boolean canSubscribe() {
+        return true;
+    }
+
+    @Override
+    public String getProtocolCode() {
+        return "bacnet-ip-active";
+    }
+
+    @Override
+    protected ProtocolStackConfigurer<BVLC> getStackConfigurer() {
+        return SingleProtocolStackConfigurer.builder(BVLC.class, BVLCIO.class)
+            .withProtocol(BacNetIpProtocolLogic.class)
+            .withPacketSizeEstimator(ByteLengthEstimator.class)
+            .withCorruptPacketRemover(CorruptPackageCleaner.class)
+            .build();
+    }
+
+}
diff --git a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriver.java b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriver.java
index a4c472c..a6020fe 100644
--- a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriver.java
+++ b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriver.java
@@ -18,36 +18,15 @@ under the License.
 */
 package org.apache.plc4x.java.bacnetip;
 
-import io.netty.buffer.ByteBuf;
-import org.apache.plc4x.java.api.PlcDriver;
 import org.apache.plc4x.java.bacnetip.configuration.PassiveBacNetIpConfiguration;
-import org.apache.plc4x.java.bacnetip.field.BacNetIpFieldHandler;
 import org.apache.plc4x.java.bacnetip.protocol.PassiveBacNetIpProtocolLogic;
 import org.apache.plc4x.java.bacnetip.readwrite.BVLC;
 import org.apache.plc4x.java.bacnetip.readwrite.io.BVLCIO;
 import org.apache.plc4x.java.spi.configuration.Configuration;
-import org.apache.plc4x.java.spi.connection.GeneratedDriverBase;
 import org.apache.plc4x.java.spi.connection.ProtocolStackConfigurer;
 import org.apache.plc4x.java.spi.connection.SingleProtocolStackConfigurer;
-import org.osgi.service.component.annotations.Component;
 
-import java.util.function.Consumer;
-import java.util.function.ToIntFunction;
-
-@Component(service = PlcDriver.class, immediate = true)
-public class PassiveBacNetIpDriver extends GeneratedDriverBase<BVLC> {
-
-    public static final int BACNET_IP_PORT = 47808;
-
-    @Override
-    public String getProtocolCode() {
-        return "bacnet-ip";
-    }
-
-    @Override
-    public String getProtocolName() {
-        return "BACnet/IP";
-    }
+public class PassiveBacNetIpDriver extends AbstractBacNetIpDriver {
 
     @Override
     protected Class<? extends Configuration> getConfigurationType() {
@@ -75,11 +54,6 @@ public class PassiveBacNetIpDriver extends GeneratedDriverBase<BVLC> {
     }
 
     @Override
-    protected BacNetIpFieldHandler getFieldHandler() {
-        return new BacNetIpFieldHandler();
-    }
-
-    @Override
     protected ProtocolStackConfigurer<BVLC> getStackConfigurer() {
         return SingleProtocolStackConfigurer.builder(BVLC.class, BVLCIO.class)
             .withProtocol(PassiveBacNetIpProtocolLogic.class)
@@ -88,26 +62,5 @@ public class PassiveBacNetIpDriver extends GeneratedDriverBase<BVLC> {
             .build();
     }
 
-    /** Estimate the Length of a Packet */
-    public static class ByteLengthEstimator implements ToIntFunction<ByteBuf> {
-        @Override
-        public int applyAsInt(ByteBuf byteBuf) {
-            if (byteBuf.readableBytes() >= 4) {
-                return byteBuf.getUnsignedShort(byteBuf.readerIndex() + 2);
-            }
-            return -1;
-        }
-    }
-
-    /** Consumes all Bytes till another Magic Byte is found */
-    public static class CorruptPackageCleaner implements Consumer<ByteBuf> {
-        @Override
-        public void accept(ByteBuf byteBuf) {
-            while (byteBuf.getUnsignedByte(0) != BVLC.BACNETTYPE) {
-                // Just consume the bytes till the next possible start position.
-                byteBuf.readByte();
-            }
-        }
-    }
 
 }
diff --git a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/configuration/BacNetIpConfiguration.java b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/configuration/BacNetIpConfiguration.java
new file mode 100644
index 0000000..99732d4
--- /dev/null
+++ b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/configuration/BacNetIpConfiguration.java
@@ -0,0 +1,43 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.bacnetip.configuration;
+
+import org.apache.plc4x.java.bacnetip.AbstractBacNetIpDriver;
+import org.apache.plc4x.java.bacnetip.PassiveBacNetIpDriver;
+import org.apache.plc4x.java.spi.configuration.Configuration;
+import org.apache.plc4x.java.spi.configuration.annotations.ConfigurationParameter;
+import org.apache.plc4x.java.spi.configuration.annotations.defaults.DoubleDefaultValue;
+import org.apache.plc4x.java.transport.pcapreplay.PcapReplayTransportConfiguration;
+import org.apache.plc4x.java.transport.rawsocket.RawSocketTransportConfiguration;
+import org.apache.plc4x.java.transport.udp.UdpTransportConfiguration;
+import org.apache.plc4x.java.utils.pcap.netty.handlers.PacketHandler;
+import org.pcap4j.packet.Dot1qVlanTagPacket;
+
+public class BacNetIpConfiguration implements Configuration, UdpTransportConfiguration {
+
+    @Override
+    public int getDefaultPort() {
+        return AbstractBacNetIpDriver.BACNET_IP_PORT;
+    }
+
+    @Override
+    public String toString() {
+        return "port=" + getDefaultPort();
+    }
+}
diff --git a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/protocol/BacNetIpProtocolLogic.java b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/protocol/BacNetIpProtocolLogic.java
new file mode 100644
index 0000000..1a67496
--- /dev/null
+++ b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/protocol/BacNetIpProtocolLogic.java
@@ -0,0 +1,237 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.bacnetip.protocol;
+
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.api.value.PlcInteger;
+import org.apache.plc4x.java.api.value.PlcLong;
+import org.apache.plc4x.java.api.value.PlcString;
+import org.apache.plc4x.java.api.value.PlcStruct;
+import org.apache.plc4x.java.api.value.PlcValue;
+import org.apache.plc4x.java.bacnetip.configuration.BacNetIpConfiguration;
+import org.apache.plc4x.java.bacnetip.ede.model.Datapoint;
+import org.apache.plc4x.java.bacnetip.ede.model.EdeModel;
+import org.apache.plc4x.java.bacnetip.field.BacNetIpField;
+import org.apache.plc4x.java.bacnetip.readwrite.APDU;
+import org.apache.plc4x.java.bacnetip.readwrite.APDUComplexAck;
+import org.apache.plc4x.java.bacnetip.readwrite.APDUConfirmedRequest;
+import org.apache.plc4x.java.bacnetip.readwrite.APDUError;
+import org.apache.plc4x.java.bacnetip.readwrite.APDUSimpleAck;
+import org.apache.plc4x.java.bacnetip.readwrite.APDUUnconfirmedRequest;
+import org.apache.plc4x.java.bacnetip.readwrite.BACnetConfirmedServiceRequest;
+import org.apache.plc4x.java.bacnetip.readwrite.BACnetConfirmedServiceRequestConfirmedCOVNotification;
+import org.apache.plc4x.java.bacnetip.readwrite.BACnetConfirmedServiceRequestReadProperty;
+import org.apache.plc4x.java.bacnetip.readwrite.BACnetConfirmedServiceRequestSubscribeCOV;
+import org.apache.plc4x.java.bacnetip.readwrite.BACnetConfirmedServiceRequestWriteProperty;
+import org.apache.plc4x.java.bacnetip.readwrite.BACnetTag;
+import org.apache.plc4x.java.bacnetip.readwrite.BACnetTagWithContent;
+import org.apache.plc4x.java.bacnetip.readwrite.BACnetUnconfirmedServiceRequest;
+import org.apache.plc4x.java.bacnetip.readwrite.BACnetUnconfirmedServiceRequestIAm;
+import org.apache.plc4x.java.bacnetip.readwrite.BACnetUnconfirmedServiceRequestUnconfirmedPrivateTransfer;
+import org.apache.plc4x.java.bacnetip.readwrite.BACnetUnconfirmedServiceRequestWhoHas;
+import org.apache.plc4x.java.bacnetip.readwrite.BACnetUnconfirmedServiceRequestWhoIs;
+import org.apache.plc4x.java.bacnetip.readwrite.BVLC;
+import org.apache.plc4x.java.bacnetip.readwrite.BVLCForwardedNPDU;
+import org.apache.plc4x.java.bacnetip.readwrite.BVLCOriginalBroadcastNPDU;
+import org.apache.plc4x.java.bacnetip.readwrite.BVLCOriginalUnicastNPDU;
+import org.apache.plc4x.java.bacnetip.readwrite.NPDU;
+import org.apache.plc4x.java.spi.ConversationContext;
+import org.apache.plc4x.java.spi.Plc4xProtocolBase;
+import org.apache.plc4x.java.spi.configuration.HasConfiguration;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionResponse;
+import org.apache.plc4x.java.spi.messages.InternalPlcSubscriptionRequest;
+import org.apache.plc4x.java.spi.messages.PlcSubscriber;
+import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
+import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
+import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
+import org.apache.plc4x.java.spi.model.InternalPlcSubscriptionHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BacNetIpProtocolLogic extends Plc4xProtocolBase<BVLC> implements HasConfiguration<BacNetIpConfiguration>, PlcSubscriber {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BacNetIpProtocolLogic.class);
+
+    private AtomicInteger requestId = new AtomicInteger();
+
+    private Map<Integer, Consumer<PlcSubscriptionEvent>> consumerIdMap = new ConcurrentHashMap<>();
+
+    @Override
+    public void setConfiguration(BacNetIpConfiguration configuration) {
+
+    }
+
+    @Override
+    public void onConnect(ConversationContext<BVLC> context) {
+        context.fireConnected();
+
+        APDUUnconfirmedRequest apdu = new APDUUnconfirmedRequest(
+            new BACnetUnconfirmedServiceRequestWhoIs((short) 0x08, null, null)
+        );
+
+        BVLCOriginalBroadcastNPDU broadcastNPDU = new BVLCOriginalBroadcastNPDU(
+            new NPDU(
+                (short) 0,
+                true,
+                false,
+                false,
+                false,
+                (byte) 0x0,
+                0,
+                (short) 0,
+                new short[] {},
+                0,
+                (short) 0,
+                new short[] {},
+                (short) 0,
+                null,
+                apdu
+            )
+        );
+        context.sendToWire(broadcastNPDU);
+    }
+
+    @Override
+    public void close(ConversationContext<BVLC> context) {
+        // Nothing to do here ...
+    }
+
+    @Override
+    protected void decode(ConversationContext<BVLC> context, BVLC msg) throws Exception {
+        NPDU npdu = null;
+        if(msg instanceof BVLCOriginalUnicastNPDU) {
+            BVLCOriginalUnicastNPDU bvlcOriginalUnicastNPDU = (BVLCOriginalUnicastNPDU) msg;
+            npdu = bvlcOriginalUnicastNPDU.getNpdu();
+        } else if (msg instanceof BVLCForwardedNPDU) {
+            BVLCForwardedNPDU bvlcForwardedNPDU = (BVLCForwardedNPDU) msg;
+            npdu = bvlcForwardedNPDU.getNpdu();
+        } else if (msg instanceof BVLCOriginalBroadcastNPDU) {
+            BVLCOriginalBroadcastNPDU bvlcOriginalBroadcastNPDU = (BVLCOriginalBroadcastNPDU) msg;
+            npdu = bvlcOriginalBroadcastNPDU.getNpdu();
+        }
+
+        if(npdu != null) {
+            if(npdu.getApdu() instanceof APDUConfirmedRequest) {
+                APDUConfirmedRequest apduConfirmedRequest = (APDUConfirmedRequest) npdu.getApdu();
+                final BACnetConfirmedServiceRequest serviceRequest = apduConfirmedRequest.getServiceRequest();
+                // A value change subscription event.
+                if(serviceRequest instanceof BACnetConfirmedServiceRequestConfirmedCOVNotification) {
+                    BACnetConfirmedServiceRequestConfirmedCOVNotification valueChange =
+                        (BACnetConfirmedServiceRequestConfirmedCOVNotification) serviceRequest;
+
+                }
+                // Someone read a value.
+                else if(serviceRequest instanceof BACnetConfirmedServiceRequestReadProperty) {
+                    // Ignore this ...
+                }
+                // Someone wrote a value.
+                else if(serviceRequest instanceof BACnetConfirmedServiceRequestWriteProperty) {
+                    // Ignore this ...
+                } else if(serviceRequest instanceof BACnetConfirmedServiceRequestSubscribeCOV) {
+                    // Ignore this ...
+                } else {
+                    LOGGER.debug(String.format("Unexpected ConfirmedServiceRequest type: %s", serviceRequest.getClass().getName()));
+                }
+            } else if(npdu.getApdu() instanceof APDUUnconfirmedRequest) {
+                APDUUnconfirmedRequest unconfirmedRequest = (APDUUnconfirmedRequest) npdu.getApdu();
+                final BACnetUnconfirmedServiceRequest serviceRequest = unconfirmedRequest.getServiceRequest();
+                if(serviceRequest instanceof BACnetUnconfirmedServiceRequestWhoHas) {
+                    // Ignore this ...
+                } else if(serviceRequest instanceof BACnetUnconfirmedServiceRequestWhoIs){
+                    // Ignore this ...
+                } else if(serviceRequest instanceof BACnetUnconfirmedServiceRequestIAm){
+                    // Ignore this ...
+                } else if(serviceRequest instanceof BACnetUnconfirmedServiceRequestUnconfirmedPrivateTransfer){
+                    // Ignore this ...
+                } else {
+                    LOGGER.debug(String.format("Unexpected UnconfirmedServiceRequest type: %s", serviceRequest.getClass().getName()));
+                }
+            } else if(npdu.getApdu() instanceof APDUError) {
+                APDUError apduError = (APDUError) npdu.getApdu();
+            } else if(npdu.getApdu() instanceof APDUSimpleAck) {
+                // Ignore this ...
+            } else if(npdu.getApdu() instanceof APDUComplexAck) {
+                // Ignore this ...
+            } else if((npdu.getApdu() == null) && (npdu.getNlm() != null)){
+                // "Who is router?" & "I am router" messages.
+                // Ignore this ...
+            } else {
+                LOGGER.debug(String.format("Unexpected NPDU type: %s", npdu.getClass().getName()));
+            }
+        }
+    }
+
+    @Override
+    public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+        Map<String, ResponseItem<PlcSubscriptionHandle>> values = new HashMap<>();
+        for (String fieldName : subscriptionRequest.getFieldNames()) {
+            values.put(fieldName, new ResponseItem<>(PlcResponseCode.OK, new DefaultPlcSubscriptionHandle(this)));
+        }
+
+        return CompletableFuture.completedFuture(
+            new DefaultPlcSubscriptionResponse((InternalPlcSubscriptionRequest) subscriptionRequest, values));
+    }
+
+    @Override
+    public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> collection) {
+        final DefaultPlcConsumerRegistration consumerRegistration =
+            new DefaultPlcConsumerRegistration(this, consumer, collection.toArray(new InternalPlcSubscriptionHandle[0]));
+        consumerIdMap.put(consumerRegistration.getConsumerHash(), consumer);
+        return consumerRegistration;
+    }
+
+    @Override
+    public void unregister(PlcConsumerRegistration plcConsumerRegistration) {
+        DefaultPlcConsumerRegistration consumerRegistration = (DefaultPlcConsumerRegistration) plcConsumerRegistration;
+        consumerIdMap.remove(consumerRegistration.getConsumerHash());
+    }
+
+    protected void publishEvent(BacNetIpField field, PlcValue plcValue) {
+        // Create a subscription event from the input.
+        final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(),
+            Collections.singletonMap("event", new ResponseItem(PlcResponseCode.OK, plcValue)));
+
+        // Send the subscription event to all listeners.
+        for (Consumer<PlcSubscriptionEvent> consumer : consumerIdMap.values()) {
+            // TODO: Check if the subscription matches the current field ..
+            consumer.accept(event);
+        }
+    }
+
+    private String toString(BacNetIpField field) {
+        return field.getDeviceIdentifier() + "/" + field.getObjectType() + "/" + field.getObjectInstance();
+    }
+
+}
diff --git a/sandbox/test-java-bacnetip-driver/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver b/sandbox/test-java-bacnetip-driver/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
index 9c311c9..b240d8a 100644
--- a/sandbox/test-java-bacnetip-driver/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
+++ b/sandbox/test-java-bacnetip-driver/src/main/resources/META-INF/services/org.apache.plc4x.java.api.PlcDriver
@@ -17,3 +17,4 @@
 # under the License.
 #
 org.apache.plc4x.java.bacnetip.PassiveBacNetIpDriver
+org.apache.plc4x.java.bacnetip.ActiveBacNetIpDriver
diff --git a/sandbox/test-java-bacnetip-driver/src/test/java/org/apache/plc4x/java/bacnetip/BacNetDriverMain.java b/sandbox/test-java-bacnetip-driver/src/test/java/org/apache/plc4x/java/bacnetip/BacNetDriverMain.java
new file mode 100644
index 0000000..18d50c7
--- /dev/null
+++ b/sandbox/test-java-bacnetip-driver/src/test/java/org/apache/plc4x/java/bacnetip/BacNetDriverMain.java
@@ -0,0 +1,50 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.bacnetip;
+
+import java.util.function.Consumer;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.value.PlcStruct;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
+
+public class BacNetDriverMain {
+
+    public static void main(String[] args) throws Exception {
+        final PlcConnection connection = new PlcDriverManager().getConnection("bacnet-ip-active:udp://192.168.2.106");
+        connection.connect();
+        PlcSubscriptionRequest plcSubscriptionRequest = connection.subscriptionRequestBuilder()
+            .addEventField("Hurz", "*/*/*")
+            .build();
+
+        final PlcSubscriptionResponse subscriptionResponse = plcSubscriptionRequest.execute().get();
+        subscriptionResponse.getSubscriptionHandle("Hurz").register(new Consumer<PlcSubscriptionEvent>() {
+            @Override
+            public void accept(PlcSubscriptionEvent plcSubscriptionEvent) {
+                PlcStruct plcStruct = (PlcStruct) ((DefaultPlcSubscriptionEvent) plcSubscriptionEvent).getValues()
+                    .get("event").getValue();
+                System.out.println(plcStruct);
+            }
+        });
+    }
+
+}