You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/07/11 14:57:49 UTC

[incubator-plc4x] branch master updated (22d5106 -> 748a39d)

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git.


    from 22d5106  added Test for Plc4XModbusProtocol and fixed several Bugs.
     add a66f66d  Refactored the way the maps the enums are built up.
     add 7a2b14b  Added some EtherNet/IP related documentation.
     new 748a39d  PLC4X-40 - Refactor the PlcSubscriber to be more aligned with the PlcReader and PlcWriter

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/plc4x/camel/Plc4XConsumer.java |  31 ++-
 .../java/org/apache/plc4x/camel/ManualTest.java    |   2 +-
 .../java/org/apache/plc4x/camel/MockDriver.java    |  57 ++++--
 .../plc4x/java/api/connection/PlcSubscriber.java   |  24 +--
 .../plc4x/java/api/messages/PlcNotification.java   |  80 --------
 .../apache/plc4x/java/api/messages/PlcRequest.java |   2 +-
 .../java/api/messages/PlcRequestContainer.java     |   8 +
 .../{PlcMessage.java => PlcSubscriptionEvent.java} |  21 ++-
 ...PlcMessage.java => PlcSubscriptionRequest.java} |   9 +-
 ...lcMessage.java => PlcSubscriptionResponse.java} |  16 +-
 .../api/messages/PlcUnsubscriptionRequest.java     |  54 ++++++
 .../PlcUnsubscriptionResponse.java}                |   4 +-
 .../java/api/messages/items/ReadRequestItem.java   |  32 ++--
 .../java/api/messages/items/ReadResponseItem.java  |  28 +--
 .../plc4x/java/api/messages/items/RequestItem.java |  28 +--
 .../java/api/messages/items/ResponseItem.java      |  28 +--
 .../api/messages/items/SubscriptionEventItem.java  |  43 ++---
 ...a => SubscriptionRequestChangeOfStateItem.java} |  11 +-
 .../items/SubscriptionRequestCyclicItem.java}      |  28 +--
 ...Item.java => SubscriptionRequestEventItem.java} |  11 +-
 .../messages/items/SubscriptionRequestItem.java}   |  34 ++--
 .../messages/items/SubscriptionResponseItem.java}  |  27 ++-
 .../messages/items/UnsubscriptionRequestItem.java  |  62 ++++++
 .../items/UnsubscriptionResponseItem.java}         |  14 +-
 .../java/api/messages/items/WriteRequestItem.java  |  28 +--
 .../java/api/messages/items/WriteResponseItem.java |  28 +--
 .../SubscriptionHandle.java}                       |  12 +-
 .../plc4x/java/api/model/SubscriptionType.java}    |  26 ++-
 .../ads/connection/AdsAbstractPlcConnection.java   |   5 +
 .../java/ads/connection/AdsTcpPlcConnection.java   | 208 ++++++++++++---------
 .../java/ads/model/AdsSubscriptionHandle.java}     |  23 +--
 .../apache/plc4x/java/ads/ManualPlc4XAdsTest.java  |  24 ++-
 .../ads/connection/AdsTcpPlcConnectionTests.java   |  46 +++--
 .../java/isotp/netty/model/types/DeviceGroup.java  |  17 +-
 .../isotp/netty/model/types/DisconnectReason.java  |  17 +-
 .../isotp/netty/model/types/ParameterCode.java     |  17 +-
 .../isotp/netty/model/types/ProtocolClass.java     |  17 +-
 .../java/isotp/netty/model/types/RejectCause.java  |  17 +-
 .../java/isotp/netty/model/types/TpduCode.java     |  17 +-
 .../java/isotp/netty/model/types/TpduSize.java     |  17 +-
 .../netty/model/types/DataTransportErrorCode.java  |  17 +-
 .../s7/netty/model/types/DataTransportSize.java    |  17 +-
 .../s7/netty/model/types/HeaderErrorClass.java     |  17 +-
 .../java/s7/netty/model/types/MemoryArea.java      |  17 +-
 .../java/s7/netty/model/types/MessageType.java     |  17 +-
 .../java/s7/netty/model/types/ParameterError.java  |  17 +-
 .../java/s7/netty/model/types/ParameterType.java   |  17 +-
 .../s7/netty/model/types/SpecificationType.java    |  17 +-
 .../java/s7/netty/model/types/TransportSize.java   |  17 +-
 .../netty/model/types/VariableAddressingMode.java  |  17 +-
 pom.xml                                            |   3 +
 src/site/asciidoc/developers/vpn.adoc              |   5 +
 52 files changed, 774 insertions(+), 577 deletions(-)
 delete mode 100644 plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
 copy plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/{PlcMessage.java => PlcSubscriptionEvent.java} (66%)
 copy plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/{PlcMessage.java => PlcSubscriptionRequest.java} (83%)
 copy plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/{PlcMessage.java => PlcSubscriptionResponse.java} (61%)
 create mode 100644 plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
 copy plc4j/api/src/main/java/org/apache/plc4x/java/api/{authentication/PlcAuthentication.java => messages/PlcUnsubscriptionResponse.java} (87%)
 copy integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XConsumerTest.java => plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java (51%)
 copy plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/{WriteResponseItem.java => SubscriptionRequestChangeOfStateItem.java} (66%)
 copy plc4j/{protocols/s7/src/main/java/org/apache/plc4x/java/s7/model/S7Address.java => api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java} (54%)
 copy plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/{WriteResponseItem.java => SubscriptionRequestEventItem.java} (68%)
 copy plc4j/api/src/{test/java/org/apache/plc4x/java/api/connection/PlcWriterTest.java => main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java} (51%)
 copy plc4j/{protocols/ads/src/main/java/org/apache/plc4x/java/ads/api/commands/AdsDeviceNotificationResponse.java => api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionResponseItem.java} (55%)
 create mode 100644 plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionRequestItem.java
 copy plc4j/{protocols/ads/src/main/java/org/apache/plc4x/java/ads/api/util/ByteBufSupplier.java => api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionResponseItem.java} (74%)
 copy plc4j/api/src/main/java/org/apache/plc4x/java/api/{messages/PlcMessage.java => model/SubscriptionHandle.java} (60%)
 copy plc4j/{protocols/s7/src/main/java/org/apache/plc4x/java/isotp/netty/model/params/TsapParameter.java => api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionType.java} (58%)
 copy plc4j/protocols/{s7/src/main/java/org/apache/plc4x/java/isotp/netty/model/params/DisconnectAdditionalInformationParameter.java => ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java} (60%)


[incubator-plc4x] 01/01: PLC4X-40 - Refactor the PlcSubscriber to be more aligned with the PlcReader and PlcWriter

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 748a39d8f3cdbabb9f4e086f1aaa4877ed042dd4
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Wed Jul 11 16:57:39 2018 +0200

    PLC4X-40 - Refactor the PlcSubscriber to be more aligned with the PlcReader and PlcWriter
---
 .../java/org/apache/plc4x/camel/Plc4XConsumer.java |  31 ++-
 .../java/org/apache/plc4x/camel/ManualTest.java    |   2 +-
 .../java/org/apache/plc4x/camel/MockDriver.java    |  57 ++++--
 .../plc4x/java/api/connection/PlcSubscriber.java   |  24 +--
 .../plc4x/java/api/messages/PlcNotification.java   |  80 --------
 .../apache/plc4x/java/api/messages/PlcRequest.java |   2 +-
 .../java/api/messages/PlcRequestContainer.java     |   8 +
 ...ResponseItem.java => PlcSubscriptionEvent.java} |  18 +-
 ...sponseItem.java => PlcSubscriptionRequest.java} |  10 +-
 ...ponseItem.java => PlcSubscriptionResponse.java} |  13 +-
 .../api/messages/PlcUnsubscriptionRequest.java     |  54 ++++++
 ...nseItem.java => PlcUnsubscriptionResponse.java} |  11 +-
 .../java/api/messages/items/ReadRequestItem.java   |  32 ++--
 .../java/api/messages/items/ReadResponseItem.java  |  28 +--
 .../plc4x/java/api/messages/items/RequestItem.java |  28 +--
 .../java/api/messages/items/ResponseItem.java      |  28 +--
 .../api/messages/items/SubscriptionEventItem.java  |  48 +++++
 ...a => SubscriptionRequestChangeOfStateItem.java} |  11 +-
 ...tem.java => SubscriptionRequestCyclicItem.java} |  25 ++-
 ...Item.java => SubscriptionRequestEventItem.java} |  11 +-
 .../messages/items/SubscriptionRequestItem.java    |  45 +++++
 .../messages/items/SubscriptionResponseItem.java   |  37 ++++
 .../messages/items/UnsubscriptionRequestItem.java  |  62 ++++++
 .../messages/items/UnsubscriptionResponseItem.java |  29 +++
 .../java/api/messages/items/WriteRequestItem.java  |  28 +--
 .../java/api/messages/items/WriteResponseItem.java |  28 +--
 .../SubscriptionHandle.java}                       |  19 +-
 .../SubscriptionType.java}                         |  25 ++-
 .../ads/connection/AdsAbstractPlcConnection.java   |   5 +
 .../java/ads/connection/AdsTcpPlcConnection.java   | 208 ++++++++++++---------
 .../java/ads/model/AdsSubscriptionHandle.java}     |  17 +-
 .../apache/plc4x/java/ads/ManualPlc4XAdsTest.java  |  24 ++-
 .../ads/connection/AdsTcpPlcConnectionTests.java   |  46 +++--
 pom.xml                                            |   3 +
 34 files changed, 722 insertions(+), 375 deletions(-)

diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index 08891e7..16fc626 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@ -26,14 +26,21 @@ import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.plc4x.java.api.connection.PlcConnection;
 import org.apache.plc4x.java.api.connection.PlcSubscriber;
 import org.apache.plc4x.java.api.exceptions.PlcException;
-import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.items.*;
 import org.apache.plc4x.java.api.model.Address;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
-public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util.function.Consumer<PlcNotification<Object>> {
+public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util.function.Consumer<SubscriptionEventItem> {
     private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XConsumer.class);
 
     private Plc4XEndpoint endpoint;
@@ -42,6 +49,7 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
     private PlcConnection plcConnection;
     private Address address;
     private Class<?> dataType;
+    private PlcSubscriptionResponse subscriptionResponse;
 
 
     public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException {
@@ -73,13 +81,20 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
     }
 
     @Override
-    protected void doStart() {
-        getSubscriber().subscribe(this, address, dataType);
+    protected void doStart() throws InterruptedException, ExecutionException, TimeoutException {
+        PlcSubscriptionRequest request = new PlcSubscriptionRequest();
+        request.addItem(new SubscriptionRequestCyclicItem(dataType, address, this, TimeUnit.SECONDS, 3));
+        CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = getSubscriber().subscribe(request);
+        subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
     }
 
     @Override
     protected void doStop() {
-        getSubscriber().unsubscribe(this, address);
+        PlcUnsubscriptionRequest request = new PlcUnsubscriptionRequest();
+        subscriptionResponse.getResponseItems().stream()
+            .map(SubscriptionResponseItem::getSubscriptionHandle)
+            .map(UnsubscriptionRequestItem::new)
+            .forEach(request::addItem);
         try {
             plcConnection.close();
         } catch (Exception e) {
@@ -92,11 +107,11 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
     }
 
     @Override
-    public void accept(PlcNotification<Object> plcNotification) {
-        LOGGER.debug("Received {}", plcNotification);
+    public void accept(SubscriptionEventItem subscriptionEventItem) {
+        LOGGER.debug("Received {}", subscriptionEventItem);
         try {
             Exchange exchange = endpoint.createExchange();
-            exchange.getIn().setBody(unwrapIfSingle(plcNotification.getValues()));
+            exchange.getIn().setBody(unwrapIfSingle(subscriptionEventItem.getValues()));
             processor.process(exchange);
         } catch (Exception e) {
             exceptionHandler.handleException(e);
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/ManualTest.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/ManualTest.java
index cd96f9f..9eff617 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/ManualTest.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/ManualTest.java
@@ -53,7 +53,7 @@ public class ManualTest {
     private static class MyRouteBuilder extends RouteBuilder {
         @Override
         public void configure() {
-            from("plc4x:ads:tcp://10.10.64.40/10.10.64.40.1.1:851/10.10.56.23.1.1:30000?dataType=java.lang.Integer&address=Allgemein_S2.Station")
+            from("plc4x:ads:tcp://10.10.64.40/10.10.64.40.1.1:851/192.168.113.3.1.1:30000?dataType=java.lang.Integer&address=Allgemein_S2.Station")
                 .process(exchange -> System.out.println("Invoked timer at " + new Date()))
                 .bean("foo")
                 .log("Received ${body}");
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
index dca996b..f7c00cb 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
@@ -25,18 +25,27 @@ import org.apache.plc4x.java.api.connection.PlcSubscriber;
 import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcException;
-import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
 import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
+import org.apache.plc4x.java.api.types.ResponseCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Calendar;
 import java.util.Collections;
-import java.util.Date;
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static org.mockito.Mockito.*;
 
@@ -58,6 +67,7 @@ public class MockDriver implements PlcDriver {
 
     @Override
     public PlcConnection connect(String url) {
+        // Mock a connection.
         PlcConnection plcConnectionMock = mock(PlcConnection.class, RETURNS_DEEP_STUBS);
         try {
             when(plcConnectionMock.parseAddress(anyString())).thenReturn(mock(Address.class));
@@ -65,29 +75,40 @@ public class MockDriver implements PlcDriver {
             throw new RuntimeException(e);
         }
         when(plcConnectionMock.getWriter()).thenReturn(Optional.of(mock(PlcWriter.class, RETURNS_DEEP_STUBS)));
+
+        // Mock a typical subscriber.
         PlcSubscriber plcSubscriber = mock(PlcSubscriber.class, RETURNS_DEEP_STUBS);
-        doAnswer(invocation -> {
+        when(plcSubscriber.subscribe(any())).thenAnswer(invocation -> {
             LOGGER.info("Received {}", invocation);
-            Consumer consumer = invocation.getArgument(0);
-            executorService.submit(() -> {
-                while (!Thread.currentThread().isInterrupted()) {
-                    consumer.accept(new PlcNotification(new Date(), mock(Address.class), Collections.singletonList("HelloWorld")));
-                    try {
-                        TimeUnit.MILLISECONDS.sleep(100);
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        throw new RuntimeException(e);
-                    }
-                }
-            });
-            return null;
-        }).when(plcSubscriber).subscribe(any(), any(), any());
+            PlcSubscriptionRequest subscriptionRequest = invocation.getArgument(0);
+            List<SubscriptionResponseItem<?>> responseItems =
+                subscriptionRequest.getRequestItems().stream().map(subscriptionRequestItem -> {
+                    Consumer consumer = subscriptionRequestItem.getConsumer();
+                    executorService.submit(() -> {
+                        while (!Thread.currentThread().isInterrupted()) {
+                            consumer.accept(new SubscriptionEventItem<>(null, Calendar.getInstance(), Collections.singletonList("HelloWorld")));
+                            try {
+                                TimeUnit.MILLISECONDS.sleep(100);
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    });
+                    return new SubscriptionResponseItem<>(subscriptionRequestItem,
+                        mock(SubscriptionHandle.class, RETURNS_DEEP_STUBS), ResponseCode.OK);
+                }).collect(Collectors.toList());
+            PlcSubscriptionResponse response = new PlcSubscriptionResponse(subscriptionRequest, responseItems);
+            CompletableFuture<PlcSubscriptionResponse> responseFuture = new CompletableFuture<>();
+            responseFuture.complete(response);
+            return responseFuture;
+        });
         when(plcConnectionMock.getSubscriber()).thenReturn(Optional.of(plcSubscriber));
         return plcConnectionMock;
     }
 
     @Override
-    public PlcConnection connect(String url, PlcAuthentication authentication) throws PlcConnectionException {
+    public PlcConnection connect(String url, PlcAuthentication authentication) {
         return connect(null);
     }
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
index 4296df3..cd52d48 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
@@ -18,9 +18,10 @@
  */
 package org.apache.plc4x.java.api.connection;
 
-import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.api.model.Address;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
 /**
@@ -29,21 +30,20 @@ import java.util.function.Consumer;
 public interface PlcSubscriber {
 
     /**
-     * Subscribes a {@code consumer} to a {@code address} parsing values as {@code dataType}.
-     * {@code consumer} and {@code address} are used as unique identification.
+     * Subscribes to addresses on the PLC.
      *
-     * @param consumer to be subscribed.
-     * @param address  to be read.
-     * @param dataType to be decoded.
+     * @param subscriptionRequest subscription request containing at least one subscription request item.
+     * @return subscription response containing a subscription response item for each subscription request item.
      */
-    <T extends R, R> void subscribe(Consumer<PlcNotification<R>> consumer, Address address, Class<T> dataType);
-
+    CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest);
 
     /**
-     * Unsubscribes a {@code consumer}.
-     * {@code consumer} and {@code address} are used as unique identification.
+     * Unsubscribes from addresses on the PLC. For unsubscribing the unsubscription request uses the subscription
+     * handle returned as part of the subscription response item.
      *
-     * @param consumer to be unsubscribed.
+     * @param unsubscriptionRequest unsubscription request containing at least one unsubscription request item.
+     * @return unsubscription response containing a unsubscription response item for each unsubscription request item.
      */
-    <R> void unsubscribe(Consumer<PlcNotification<R>> consumer, Address address);
+    CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest);
+
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
deleted file mode 100644
index ce68982..0000000
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.api.messages;
-
-import org.apache.plc4x.java.api.model.Address;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Objects;
-
-public class PlcNotification<T> {
-
-    protected final Date timeStamp;
-
-    protected final Address address;
-
-    protected final List<T> values;
-
-    public PlcNotification(Date timeStamp, Address address, List<T> values) {
-        this.timeStamp = timeStamp;
-        this.address = address;
-        this.values = values;
-    }
-
-    public Date getTimeStamp() {
-        return timeStamp;
-    }
-
-    public Address getAddress() {
-        return address;
-    }
-
-    public List<T> getValues() {
-        return values;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof PlcNotification)) {
-            return false;
-        }
-        PlcNotification<?> that = (PlcNotification<?>) o;
-        return Objects.equals(timeStamp, that.timeStamp) &&
-            Objects.equals(address, that.address) &&
-            Objects.equals(values, that.values);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(timeStamp, address, values);
-    }
-
-    @Override
-    public String toString() {
-        return "PlcNotification{" +
-            "timeStamp=" + timeStamp +
-            ", address=" + address +
-            ", values=" + values +
-            '}';
-    }
-}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
index 3ad5fc1..3ce851b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
@@ -101,7 +101,7 @@ public abstract class PlcRequest<REQUEST_ITEM extends RequestItem> implements Pl
     @Override
     public String toString() {
         return "PlcRequest{" +
-            "requestItems=" + requestItems +
+            "eventItems=" + requestItems +
             '}';
     }
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestContainer.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestContainer.java
index 4168246..f7259f6 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestContainer.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestContainer.java
@@ -21,6 +21,14 @@ package org.apache.plc4x.java.api.messages;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
+/**
+ * Helper mostly used internally to split up big requests into multiple sub-requests.
+ *
+ * TODO: Think about moving this into one of the driver-base modules.
+ *
+ * @param <T>
+ * @param <R>
+ */
 public class PlcRequestContainer<T extends PlcRequest, R extends PlcResponse> implements ProtocolMessage {
 
     private final T request;
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionEvent.java
similarity index 63%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionEvent.java
index e923524..e8b1d0d 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionEvent.java
@@ -1,3 +1,9 @@
+package org.apache.plc4x.java.api.messages;
+
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+
+import java.util.List;
+
 /*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -16,14 +22,16 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
+public class PlcSubscriptionEvent {
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+    protected final List<SubscriptionEventItem<?>> eventItems;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+    public PlcSubscriptionEvent(List<SubscriptionEventItem<?>> eventItems) {
+        this.eventItems = eventItems;
+    }
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
+    public List<SubscriptionEventItem<?>> getEventItems() {
+        return eventItems;
     }
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
similarity index 70%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
index e923524..65b0659 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.messages;
 /*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -16,14 +17,9 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
-
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
-    }
+public class PlcSubscriptionRequest extends PlcRequest<SubscriptionRequestItem<?>> {
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
similarity index 59%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
index e923524..3c271df 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.messages;
 /*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -16,14 +17,16 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+import java.util.List;
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
+public class PlcSubscriptionResponse extends PlcResponse<PlcSubscriptionRequest, SubscriptionResponseItem<?>, SubscriptionRequestItem<?>> {
+
+    public PlcSubscriptionResponse(PlcSubscriptionRequest request, List<SubscriptionResponseItem<?>> subscriptionResponseItems) {
+        super(request, subscriptionResponseItems);
     }
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
new file mode 100644
index 0000000..94377d3
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.api.messages;
+
+import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem;
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+
+public class PlcUnsubscriptionRequest implements PlcMessage {
+
+    protected final List<UnsubscriptionRequestItem> requestItems;
+
+    public PlcUnsubscriptionRequest() {
+        this.requestItems = new LinkedList<>();
+    }
+
+    public PlcUnsubscriptionRequest(List<UnsubscriptionRequestItem> requestItems) {
+        Objects.requireNonNull(requestItems, "Request items must not be null");
+        this.requestItems = requestItems;
+    }
+
+    public void addItem(UnsubscriptionRequestItem unsubscriptionRequestItem) {
+        Objects.requireNonNull(unsubscriptionRequestItem, "Request item must not be null");
+        getRequestItems().add(unsubscriptionRequestItem);
+    }
+
+    public List<UnsubscriptionRequestItem> getRequestItems() {
+        return requestItems;
+    }
+
+    public int getNumberOfItems() {
+        return getRequestItems().size();
+    }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
similarity index 70%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
index e923524..f75415a 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.messages;
 /*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -16,14 +17,6 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
-
-import org.apache.plc4x.java.api.types.ResponseCode;
-
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
-
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
-    }
+public class PlcUnsubscriptionResponse implements PlcMessage {
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadRequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadRequestItem.java
index 222310e..bc77dc5 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadRequestItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadRequestItem.java
@@ -1,20 +1,20 @@
 /*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadResponseItem.java
index 9d5a4d6..f7736e8 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadResponseItem.java
@@ -1,20 +1,20 @@
 /*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
 
-  http://www.apache.org/licenses/LICENSE-2.0
+   http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/RequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/RequestItem.java
index 4df0e4e..39a3ec2 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/RequestItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/RequestItem.java
@@ -1,20 +1,20 @@
 /*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
 
-  http://www.apache.org/licenses/LICENSE-2.0
+   http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ResponseItem.java
index 34f61b1..5cb6d9b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ResponseItem.java
@@ -1,20 +1,20 @@
 /*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
 
-  http://www.apache.org/licenses/LICENSE-2.0
+   http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java
new file mode 100644
index 0000000..49a0200
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java
@@ -0,0 +1,48 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import java.util.Calendar;
+import java.util.List;
+
+public class SubscriptionEventItem<T> {
+
+    private SubscriptionRequestItem<T> subscriptionRequestItem;
+    private Calendar timestamp;
+    private List<T> values;
+
+    public SubscriptionEventItem(SubscriptionRequestItem<T> subscriptionRequestItem, Calendar timestamp, List<T> values) {
+        this.subscriptionRequestItem = subscriptionRequestItem;
+        this.timestamp = timestamp;
+        this.values = values;
+    }
+
+    public SubscriptionRequestItem<T> getSubscriptionRequestItem() {
+        return subscriptionRequestItem;
+    }
+
+    public Calendar getTimestamp() {
+        return timestamp;
+    }
+
+    public List<T> getValues() {
+        return values;
+    }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
similarity index 66%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
index e923524..57db70f 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
@@ -18,12 +18,15 @@ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionType;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+import java.util.function.Consumer;
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
+public class SubscriptionRequestChangeOfStateItem extends SubscriptionRequestItem {
+
+    public SubscriptionRequestChangeOfStateItem(Class datatype, Address address, Consumer consumer) {
+        super(datatype, address, SubscriptionType.CHANGE_OF_STATE, consumer);
     }
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
similarity index 54%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
index e923524..b1a491b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
@@ -18,12 +18,29 @@ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionType;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
+public class SubscriptionRequestCyclicItem extends SubscriptionRequestItem {
+
+    private TimeUnit timeUnit;
+    private int period;
+
+    public SubscriptionRequestCyclicItem(Class datatype, Address address, Consumer consumer, TimeUnit timeUnit, int period) {
+        super(datatype, address, SubscriptionType.CYCLIC, consumer);
+        this.timeUnit = timeUnit;
+        this.period = period;
+    }
+
+    public TimeUnit getTimeUnit() {
+        return timeUnit;
+    }
+
+    public int getPeriod() {
+        return period;
     }
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
similarity index 68%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
index e923524..8ed69c5 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
@@ -18,12 +18,15 @@ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionType;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+import java.util.function.Consumer;
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
+public class SubscriptionRequestEventItem extends SubscriptionRequestItem {
+
+    public SubscriptionRequestEventItem(Class datatype, Address address, Consumer consumer) {
+        super(datatype, address, SubscriptionType.EVENT, consumer);
     }
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java
new file mode 100644
index 0000000..7591e09
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java
@@ -0,0 +1,45 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionType;
+
+import java.util.function.Consumer;
+
+public abstract class SubscriptionRequestItem<T> extends RequestItem<T> {
+
+    private SubscriptionType subscriptionType;
+    private Consumer<SubscriptionEventItem<T>> consumer;
+
+    public SubscriptionRequestItem(Class<T> datatype, Address address, SubscriptionType subscriptionType, Consumer<SubscriptionEventItem<T>> consumer) {
+        super(datatype, address);
+        this.subscriptionType = subscriptionType;
+        this.consumer = consumer;
+    }
+
+    public SubscriptionType getSubscriptionType() {
+        return subscriptionType;
+    }
+
+    public Consumer<SubscriptionEventItem<T>> getConsumer() {
+        return consumer;
+    }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionResponseItem.java
new file mode 100644
index 0000000..c38e28b
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionResponseItem.java
@@ -0,0 +1,37 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
+import org.apache.plc4x.java.api.types.ResponseCode;
+
+public class SubscriptionResponseItem<T> extends ResponseItem<SubscriptionRequestItem<T>>  {
+
+    private SubscriptionHandle subscriptionHandle;
+
+    public SubscriptionResponseItem(SubscriptionRequestItem<T> requestItem, SubscriptionHandle subscriptionHandle, ResponseCode responseCode) {
+        super(requestItem, responseCode);
+        this.subscriptionHandle = subscriptionHandle;
+    }
+
+    public SubscriptionHandle getSubscriptionHandle() {
+        return subscriptionHandle;
+    }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionRequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionRequestItem.java
new file mode 100644
index 0000000..d3312f3
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionRequestItem.java
@@ -0,0 +1,62 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
+
+import java.util.Objects;
+
+public class UnsubscriptionRequestItem {
+
+    private SubscriptionHandle subscriptionHandle;
+
+    public UnsubscriptionRequestItem(SubscriptionHandle subscriptionHandle) {
+        Objects.requireNonNull(subscriptionHandle, "SubscriptionHandle must not be null");
+        this.subscriptionHandle = subscriptionHandle;
+    }
+
+    public SubscriptionHandle getSubscriptionHandle() {
+        return subscriptionHandle;
+    }
+
+    @Override
+    public String toString() {
+        return "UnsubscriptionRequestItem{" +
+            "subscriptionHandle=" + subscriptionHandle +
+            '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof UnsubscriptionRequestItem)) {
+            return false;
+        }
+        UnsubscriptionRequestItem that = (UnsubscriptionRequestItem) o;
+        return Objects.equals(subscriptionHandle, that.subscriptionHandle);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(subscriptionHandle);
+    }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionResponseItem.java
new file mode 100644
index 0000000..8a7c659
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionResponseItem.java
@@ -0,0 +1,29 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import org.apache.plc4x.java.api.types.ResponseCode;
+
+public class UnsubscriptionResponseItem {
+
+    public UnsubscriptionResponseItem(UnsubscriptionRequestItem requestItem, ResponseCode responseCode) {
+
+    }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteRequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteRequestItem.java
index 3acbef5..9aba923 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteRequestItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteRequestItem.java
@@ -1,20 +1,20 @@
 /*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
 
-  http://www.apache.org/licenses/LICENSE-2.0
+   http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
index e923524..9928b7d 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
@@ -1,20 +1,20 @@
 /*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
 
-  http://www.apache.org/licenses/LICENSE-2.0
+   http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionHandle.java
similarity index 59%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionHandle.java
index e923524..445d9e9 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionHandle.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.model;
 /*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -16,14 +17,16 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
-
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
-
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
-    }
+/**
+ * When subscribing to remote resources, depending on the used protocol
+ * different data is used to identify a subscription. This interface is
+ * to be implemented in the individual Driver implementations to contain
+ * all information needed to pull or unsubscribe any form of subscription.
+ *
+ * For every subscribed item, a separate {@link SubscriptionHandle} object is
+ * returned in order to allow fine granular unsubscriptions.
+ */
+public interface SubscriptionHandle {
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionType.java
similarity index 57%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionType.java
index e923524..287420b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionType.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.model;
 /*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -16,14 +17,26 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+/**
+ * {@link SubscriptionType} specifies the nature of the subscription.
+ * In general PLC4X supports exactly 3 types of subscriptions.
+ */
+public enum SubscriptionType {
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+    /**
+     * A cyclic subscription where a value is sent no matter if it's value changed in a given interval.
+     */
+    CYCLIC,
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
-    }
+    /**
+     * Only send data, if a value in the PLC changed.
+     */
+    CHANGE_OF_STATE,
+
+    /**
+     * Subscribe to events created by the PLC which usually are defined in the PLCs application (Alarms).
+     */
+    EVENT
 
 }
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
index 2304199..cf1b8d9 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
@@ -149,6 +149,8 @@ public abstract class AdsAbstractPlcConnection extends AbstractPlcConnection imp
     }
 
     protected void mapAddress(SymbolicAdsAddress symbolicAdsAddress) {
+        // If the map doesn't contain an entry for the given symbolicAdsAddress,
+        // resolve it and add it to the map.
         addressMapping.computeIfAbsent(symbolicAdsAddress, symbolicAdsAddressInternal -> {
             LOGGER.debug("Resolving {}", symbolicAdsAddressInternal);
             AdsReadWriteRequest adsReadWriteRequest = AdsReadWriteRequest.of(
@@ -163,13 +165,16 @@ public abstract class AdsAbstractPlcConnection extends AbstractPlcConnection imp
                 Data.of(symbolicAdsAddressInternal.getSymbolicAddress())
             );
 
+            // TODO: This is blocking, should be changed to be async.
             CompletableFuture<PlcProprietaryResponse<AdsReadWriteResponse>> getHandelFuture = new CompletableFuture<>();
             channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsReadWriteRequest), getHandelFuture));
             PlcProprietaryResponse<AdsReadWriteResponse> getHandleResponse = getFromFuture(getHandelFuture, SYMBOL_RESOLVE_TIMEOUT);
             AdsReadWriteResponse response = getHandleResponse.getResponse();
+
             if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
                 throw new PlcRuntimeException("Non error code received " + response.getResult());
             }
+
             IndexOffset symbolHandle = IndexOffset.of(response.getData().getBytes());
             return AdsAddress.of(IndexGroup.ReservedGroups.ADSIGRP_SYM_VALBYHND.getAsLong(), symbolHandle.getAsLong());
         });
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 4a04555..124d8d6 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -22,13 +22,13 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.ads.api.commands.*;
 import org.apache.plc4x.java.ads.api.commands.types.*;
 import org.apache.plc4x.java.ads.api.generic.types.AmsNetId;
 import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
 import org.apache.plc4x.java.ads.api.generic.types.Invoke;
 import org.apache.plc4x.java.ads.model.AdsAddress;
+import org.apache.plc4x.java.ads.model.AdsSubscriptionHandle;
 import org.apache.plc4x.java.ads.model.SymbolicAdsAddress;
 import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
 import org.apache.plc4x.java.ads.protocol.Payload2TcpProtocol;
@@ -37,11 +37,13 @@ import org.apache.plc4x.java.ads.protocol.util.LittleEndianDecoder;
 import org.apache.plc4x.java.api.connection.PlcSubscriber;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcNotification;
-import org.apache.plc4x.java.api.messages.PlcProprietaryRequest;
-import org.apache.plc4x.java.api.messages.PlcProprietaryResponse;
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
+import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem;
 import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.types.ResponseCode;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,10 +67,6 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
 
     private static AtomicInteger localPorts = new AtomicInteger(30000);
 
-    private final Map<Pair<Consumer<? extends PlcNotification>, Address>, Pair<Consumer<AdsDeviceNotificationRequest>, NotificationHandle>> subscriberMap = new HashMap<>();
-
-    private final Map<NotificationHandle, Consumer<? extends PlcNotification>> handleConsumerMap = new HashMap<>();
-
     private AdsTcpPlcConnection(InetAddress address, AmsNetId targetAmsNetId, AmsPort targetAmsPort) {
         this(address, targetAmsNetId, targetAmsPort, generateAMSNetId(), generateAMSPort());
     }
@@ -132,99 +130,129 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
     }
 
     @Override
-    public <T extends R, R> void subscribe(Consumer<PlcNotification<R>> consumer, Address address, Class<T> dataType) {
-        Objects.requireNonNull(consumer);
-        Objects.requireNonNull(address);
-        IndexGroup indexGroup;
-        IndexOffset indexOffset;
-        if (address instanceof SymbolicAdsAddress) {
-            mapAddress((SymbolicAdsAddress) address);
-            AdsAddress adsAddress = addressMapping.get(address);
-            if (adsAddress == null) {
-                throw new PlcRuntimeException("Unresolvable address" + address);
+    public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+        // TODO: Make this multi-value
+        CompletableFuture<PlcSubscriptionResponse> future = new CompletableFuture<>();
+        if(subscriptionRequest.getNumberOfItems() == 1) {
+            SubscriptionRequestItem<?> subscriptionRequestItem = subscriptionRequest.getRequestItem().orElse(null);
+
+            Objects.requireNonNull(subscriptionRequestItem);
+            Objects.requireNonNull(subscriptionRequestItem.getConsumer());
+            Objects.requireNonNull(subscriptionRequestItem.getAddress());
+            Objects.requireNonNull(subscriptionRequestItem.getDatatype());
+
+            Address address = subscriptionRequestItem.getAddress();
+            Class<?> datatype = subscriptionRequestItem.getDatatype();
+
+            IndexGroup indexGroup;
+            IndexOffset indexOffset;
+            // If this is a symbolic address, it has to be resolved first.
+            // TODO: This is blocking, should be changed to be async.
+            if (address instanceof SymbolicAdsAddress) {
+                mapAddress((SymbolicAdsAddress) address);
+                AdsAddress adsAddress = addressMapping.get(address);
+                if (adsAddress == null) {
+                    throw new PlcRuntimeException("Unresolvable address" + address);
+                }
+                indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
+                indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
+            }
+            // If it's no symbolic address, we can continue immediately
+            // without having to do any resolving.
+            else if (address instanceof AdsAddress) {
+                AdsAddress adsAddress = (AdsAddress) address;
+                indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
+                indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
+            } else {
+                throw new IllegalArgumentException("Unsupported address type " + address.getClass());
             }
-            indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
-            indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
-        } else if (address instanceof AdsAddress) {
-            AdsAddress adsAddress = (AdsAddress) address;
-            indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
-            indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
-        } else {
-            throw new IllegalArgumentException("Unssuported address type " + address.getClass());
-        }
-        AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = AdsAddDeviceNotificationRequest.of(
-            targetAmsNetId,
-            targetAmsPort,
-            sourceAmsNetId,
-            sourceAmsPort,
-            Invoke.NONE,
-            indexGroup,
-            indexOffset,
-            LittleEndianDecoder.getLengthFor(dataType, 1),
-            TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE,
-            MaxDelay.of(0),
-            CycleTime.of(4000000)
-        );
-
-        CompletableFuture<PlcProprietaryResponse<AdsAddDeviceNotificationResponse>> addDeviceFuture = new CompletableFuture<>();
-        channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsAddDeviceNotificationRequest), addDeviceFuture));
-        PlcProprietaryResponse<AdsAddDeviceNotificationResponse> addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT);
-        AdsAddDeviceNotificationResponse response = addDeviceResponse.getResponse();
-        if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
-            throw new PlcRuntimeException("Non error code received " + response.getResult());
-        }
-        NotificationHandle notificationHandle = response.getNotificationHandle();
-        handleConsumerMap.put(notificationHandle, consumer);
-
-        Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer =
-            adsDeviceNotificationRequest -> adsDeviceNotificationRequest.getAdsStampHeaders().forEach(adsStampHeader -> {
-                Date timeStamp = adsStampHeader.getTimeStamp().getAsDate();
-
-                adsStampHeader.getAdsNotificationSamples()
-                    .forEach(adsNotificationSample -> {
-                        Consumer<? extends PlcNotification> plcNotificationConsumer = handleConsumerMap.get(adsNotificationSample.getNotificationHandle());
-                        if (plcNotificationConsumer == null) {
-                            LOGGER.warn("Unmapped notification received {}", adsNotificationSample);
-                            return;
-                        }
-                        Data data = adsNotificationSample.getData();
-                        try {
-                            @SuppressWarnings("unchecked")
-                            List<R> decodeData = (List<R>) LittleEndianDecoder.decodeData(dataType, data.getBytes());
-                            consumer.accept(new PlcNotification<>(timeStamp, address, decodeData));
-                        } catch (PlcProtocolException | RuntimeException e) {
-                            LOGGER.error("Can't decode {}", data, e);
-                        }
-                    });
-            });
-        subscriberMap.put(Pair.of(consumer, address), Pair.of(adsDeviceNotificationRequestConsumer, notificationHandle));
-        getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer);
-    }
 
-    @Override
-    public <R> void unsubscribe(Consumer<PlcNotification<R>> consumer, Address address) {
-        Pair<Consumer<AdsDeviceNotificationRequest>, NotificationHandle> handlePair = subscriberMap.remove(Pair.of(consumer, address));
-        if (handlePair != null) {
-            NotificationHandle notificationHandle = handlePair.getRight();
-            AdsDeleteDeviceNotificationRequest adsDeleteDeviceNotificationRequest = AdsDeleteDeviceNotificationRequest.of(
+            // Prepare the subscription request itself.
+            AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = AdsAddDeviceNotificationRequest.of(
                 targetAmsNetId,
                 targetAmsPort,
                 sourceAmsNetId,
                 sourceAmsPort,
                 Invoke.NONE,
-                notificationHandle
+                indexGroup,
+                indexOffset,
+                LittleEndianDecoder.getLengthFor(datatype, 1),
+                TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE,
+                MaxDelay.of(0),
+                CycleTime.of(4000000)
             );
-            CompletableFuture<PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse>> deleteDeviceFuture = new CompletableFuture<>();
-            channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsDeleteDeviceNotificationRequest), deleteDeviceFuture));
 
-            PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse> deleteDeviceResponse = getFromFuture(deleteDeviceFuture, DEL_DEVICE_TIMEOUT);
-            AdsDeleteDeviceNotificationResponse response = deleteDeviceResponse.getResponse();
+            // Send the request to the plc and wait for a response
+            // TODO: This is blocking, should be changed to be async.
+            CompletableFuture<PlcProprietaryResponse<AdsAddDeviceNotificationResponse>> addDeviceFuture = new CompletableFuture<>();
+            channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsAddDeviceNotificationRequest), addDeviceFuture));
+            PlcProprietaryResponse<AdsAddDeviceNotificationResponse> addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT);
+            AdsAddDeviceNotificationResponse response = addDeviceResponse.getResponse();
+
+            // Abort if we got anything but a successful response.
             if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
-                throw new PlcRuntimeException("Non error code received " + response.getResult());
+                throw new PlcRuntimeException("Error code received " + response.getResult());
             }
+            AdsSubscriptionHandle adsSubscriptionHandle = new AdsSubscriptionHandle(response.getNotificationHandle());
+            future.complete(new PlcSubscriptionResponse(subscriptionRequest, Collections.singletonList(
+                new SubscriptionResponseItem<>(subscriptionRequestItem, adsSubscriptionHandle, ResponseCode.OK))));
+
+            Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer =
+                adsDeviceNotificationRequest -> adsDeviceNotificationRequest.getAdsStampHeaders().forEach(adsStampHeader -> {
+                    Calendar timeStamp = Calendar.getInstance();
+                    timeStamp.setTime(adsStampHeader.getTimeStamp().getAsDate());
 
-            getChannel().pipeline().get(Plc4x2AdsProtocol.class).removeConsumer(handlePair.getLeft());
-            handleConsumerMap.remove(notificationHandle);
+                    adsStampHeader.getAdsNotificationSamples()
+                        .forEach(adsNotificationSample -> {
+                            Data data = adsNotificationSample.getData();
+                            try {
+                                @SuppressWarnings("unchecked")
+                                List<?> decodeData = LittleEndianDecoder.decodeData(datatype, data.getBytes());
+                                SubscriptionEventItem subscriptionEventItem =
+                                    new SubscriptionEventItem(subscriptionRequestItem, timeStamp, decodeData);
+                                subscriptionRequestItem.getConsumer().accept(subscriptionEventItem);
+                            } catch (PlcProtocolException | RuntimeException e) {
+                                LOGGER.error("Can't decode {}", data, e);
+                            }
+                        });
+                });
+            // TODO: What's this for? Is this still needed if we use the consumers in the subscriptions?
+            getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer);
+        }
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
+        for (UnsubscriptionRequestItem unsubscriptionRequestItem : unsubscriptionRequest.getRequestItems()) {
+            Objects.requireNonNull(unsubscriptionRequestItem);
+            if(unsubscriptionRequestItem.getSubscriptionHandle() instanceof AdsSubscriptionHandle) {
+                AdsSubscriptionHandle adsSubscriptionHandle =
+                    (AdsSubscriptionHandle) unsubscriptionRequestItem.getSubscriptionHandle();
+                AdsDeleteDeviceNotificationRequest adsDeleteDeviceNotificationRequest =
+                    AdsDeleteDeviceNotificationRequest.of(
+                        targetAmsNetId,
+                        targetAmsPort,
+                        sourceAmsNetId,
+                        sourceAmsPort,
+                        Invoke.NONE,
+                        adsSubscriptionHandle.getNotificationHandle()
+                    );
+                CompletableFuture<PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse>> deleteDeviceFuture =
+                    new CompletableFuture<>();
+                channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(
+                    adsDeleteDeviceNotificationRequest), deleteDeviceFuture));
+
+                PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse> deleteDeviceResponse =
+                    getFromFuture(deleteDeviceFuture, DEL_DEVICE_TIMEOUT);
+                AdsDeleteDeviceNotificationResponse response = deleteDeviceResponse.getResponse();
+                if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
+                    throw new PlcRuntimeException("Non error code received " + response.getResult());
+                }
+            }
         }
+        CompletableFuture<PlcUnsubscriptionResponse> future = new CompletableFuture<>();
+        future.complete(new PlcUnsubscriptionResponse());
+        return future;
     }
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
similarity index 59%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
index e923524..2850fa9 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
@@ -16,14 +16,21 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
+package org.apache.plc4x.java.ads.model;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.ads.api.commands.types.NotificationHandle;
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+public class AdsSubscriptionHandle implements SubscriptionHandle {
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
+    private NotificationHandle notificationHandle;
+
+    public AdsSubscriptionHandle(NotificationHandle notificationHandle) {
+        this.notificationHandle = notificationHandle;
+    }
+
+    public NotificationHandle getNotificationHandle() {
+        return notificationHandle;
     }
 
 }
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
index 7d8566e..3511e32 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
@@ -22,8 +22,8 @@ import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
 import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.connection.PlcSubscriber;
-import org.apache.plc4x.java.api.messages.PlcNotification;
-import org.apache.plc4x.java.api.messages.items.ReadResponseItem;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.items.*;
 import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
 import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
 import org.apache.plc4x.java.api.model.Address;
@@ -41,7 +41,7 @@ public class ManualPlc4XAdsTest {
             connectionUrl = "ads:serial:///dev/ttys003/10.10.64.40.1.1:851/10.10.56.23.1.1:30000";
         } else {
             System.out.println("Using tcp");
-            connectionUrl = "ads:tcp://10.10.64.40/10.10.64.40.1.1:851/10.10.56.23.1.1:30000";
+            connectionUrl = "ads:tcp://10.10.64.40/10.10.64.40.1.1:851/192.168.113.1.1.1:30000";
         }
         try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionUrl)) {
             System.out.println("PlcConnection " + plcConnection);
@@ -57,11 +57,21 @@ public class ManualPlc4XAdsTest {
             System.out.println("ResponseItem " + responseItem);
             responseItem.getValues().stream().map(integer -> "Value: " + integer).forEach(System.out::println);
 
-            Consumer<PlcNotification<Integer>> notificationConsumer = plcNotification -> System.out.println("Received notification " + plcNotification);
+            Consumer<SubscriptionEventItem<Integer>> notificationConsumer = plcNotification -> System.out.println("Received notification " + plcNotification);
             PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("Subscribe not available"));
-            plcSubscriber.subscribe(notificationConsumer, address, Integer.class);
-            TimeUnit.SECONDS.sleep(5);
-            plcSubscriber.unsubscribe(notificationConsumer, address);
+            PlcSubscriptionRequest subscriptionRequest = new PlcSubscriptionRequest();
+            subscriptionRequest.addItem(new SubscriptionRequestChangeOfStateItem(Integer.class, address, notificationConsumer));
+            CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = plcSubscriber.subscribe(subscriptionRequest);
+            PlcSubscriptionResponse subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
+            SubscriptionResponseItem subscriptionResponseItem = subscriptionResponse.getResponseItem().get();
+
+            PlcUnsubscriptionRequest unsubscriptionRequest = new PlcUnsubscriptionRequest();
+            unsubscriptionRequest.addItem(
+                new UnsubscriptionRequestItem(subscriptionResponseItem.getSubscriptionHandle()));
+            CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionFuture =
+                plcSubscriber.unsubscribe(unsubscriptionRequest);
+            PlcUnsubscriptionResponse unsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS);
+            System.out.println(unsubscriptionResponse);
         }
         System.exit(0);
     }
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
index a92a65e..74fbca0 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
@@ -29,10 +29,11 @@ import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
 import org.apache.plc4x.java.ads.model.AdsAddress;
 import org.apache.plc4x.java.ads.model.SymbolicAdsAddress;
 import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
-import org.apache.plc4x.java.api.messages.PlcNotification;
-import org.apache.plc4x.java.api.messages.PlcProprietaryRequest;
-import org.apache.plc4x.java.api.messages.PlcProprietaryResponse;
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestChangeOfStateItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,9 +50,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
+import static org.hamcrest.core.IsNull.notNullValue;
 import static org.junit.Assert.*;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.*;
+import static org.hamcrest.core.IsEqual.equalTo;
 
 public class AdsTcpPlcConnectionTests {
 
@@ -66,6 +69,7 @@ public class AdsTcpPlcConnectionTests {
     @Before
     public void setUp() throws Exception {
         SUT = AdsTcpPlcConnection.of(InetAddress.getByName("localhost"), AmsNetId.of("0.0.0.0.0.0"), AmsPort.of(13));
+        // TODO: Refactor this to use the TestChannelFactory instead.
         channelMock = mock(Channel.class, RETURNS_DEEP_STUBS);
         FieldUtils.writeField(SUT, "channel", channelMock, true);
         executorService = Executors.newFixedThreadPool(10);
@@ -115,6 +119,7 @@ public class AdsTcpPlcConnectionTests {
 
     @Test
     public void subscribe() throws Exception {
+        // TODO: Does this really test the driver implementation?
         when(channelMock.writeAndFlush(any(PlcRequestContainer.class)))
             .then(invocationOnMock -> {
                 PlcRequestContainer plcRequestContainer = invocationOnMock.getArgument(0);
@@ -158,19 +163,30 @@ public class AdsTcpPlcConnectionTests {
         when(channelMock.pipeline().get(Plc4x2AdsProtocol.class)).thenReturn(plc4x2AdsProtocol);
 
         CompletableFuture<?> notificationReceived = new CompletableFuture<>();
-        Consumer<PlcNotification<String>> plcNotificationConsumer = plcNotification -> {
+        Consumer<SubscriptionEventItem<String>> plcNotificationConsumer = plcNotification -> {
             LOGGER.info("Received {}", plcNotification);
             notificationReceived.complete(null);
         };
-        SUT.subscribe(plcNotificationConsumer, SUT.parseAddress("0/0"), String.class);
-        SUT.subscribe(plcNotificationConsumer, SUT.parseAddress("Main.by[0]"), String.class);
-        notificationReceived.get(3, TimeUnit.SECONDS);
-    }
-
-    @Test
-    public void unsubscribe() {
-        Consumer<PlcNotification<String>> plcNotificationConsumer = plcNotification -> {
-        };
-        SUT.unsubscribe(plcNotificationConsumer, SUT.parseAddress("0/0"));
+        PlcSubscriptionRequest subscriptionRequest = new PlcSubscriptionRequest();
+        subscriptionRequest.addItem(new SubscriptionRequestChangeOfStateItem(
+            String.class, SUT.parseAddress("0/0"), plcNotificationConsumer));
+        /*subscriptionRequest.addItem(new SubscriptionRequestItem<>(
+            String.class, SUT.parseAddress("Main.by[0]"), plcNotificationConsumer));*/
+        CompletableFuture<? extends PlcSubscriptionResponse> subscriptionFuture = SUT.subscribe(subscriptionRequest);
+        PlcSubscriptionResponse subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
+        //notificationReceived.get(3, TimeUnit.SECONDS);
+        assertThat(subscriptionResponse, notNullValue());
+        assertThat(subscriptionResponse.getNumberOfItems(), equalTo(1));
+
+        // Now unsubscribe again ...
+
+        // TODO: Setup the mock to actually perform the unsubscription.
+        /*PlcUnsubscriptionRequest unsubscriptionRequest = new PlcUnsubscriptionRequest();
+        for (SubscriptionResponseItem<?> subscriptionResponseItem : subscriptionResponse.getResponseItems()) {
+            unsubscriptionRequest.addItem(subscriptionResponseItem.getSubscriptionHandle());
+        }
+        CompletableFuture<? extends PlcUnsubscriptionResponse> unsubscriptionFuture = SUT.unsubscribe(unsubscriptionRequest);
+        PlcUnsubscriptionResponse plcUnsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS);
+        assertThat(plcUnsubscriptionResponse, notNullValue());*/
     }
 }
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index e2796f4..ab4629a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -316,6 +316,9 @@
 
             <!-- Jenkins build related files -->
             <exclude>.repository/**</exclude>
+
+            <!-- Data files created by examples running an embedded elasticsearch -->
+            <exclude>elasticsearch-data/**</exclude>
           </excludes>
         </configuration>
       </plugin>