You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/07/11 14:57:50 UTC

[incubator-plc4x] 01/01: PLC4X-40 - Refactor the PlcSubscriber to be more aligned with the PlcReader and PlcWriter

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 748a39d8f3cdbabb9f4e086f1aaa4877ed042dd4
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Wed Jul 11 16:57:39 2018 +0200

    PLC4X-40 - Refactor the PlcSubscriber to be more aligned with the PlcReader and PlcWriter
---
 .../java/org/apache/plc4x/camel/Plc4XConsumer.java |  31 ++-
 .../java/org/apache/plc4x/camel/ManualTest.java    |   2 +-
 .../java/org/apache/plc4x/camel/MockDriver.java    |  57 ++++--
 .../plc4x/java/api/connection/PlcSubscriber.java   |  24 +--
 .../plc4x/java/api/messages/PlcNotification.java   |  80 --------
 .../apache/plc4x/java/api/messages/PlcRequest.java |   2 +-
 .../java/api/messages/PlcRequestContainer.java     |   8 +
 ...ResponseItem.java => PlcSubscriptionEvent.java} |  18 +-
 ...sponseItem.java => PlcSubscriptionRequest.java} |  10 +-
 ...ponseItem.java => PlcSubscriptionResponse.java} |  13 +-
 .../api/messages/PlcUnsubscriptionRequest.java     |  54 ++++++
 ...nseItem.java => PlcUnsubscriptionResponse.java} |  11 +-
 .../java/api/messages/items/ReadRequestItem.java   |  32 ++--
 .../java/api/messages/items/ReadResponseItem.java  |  28 +--
 .../plc4x/java/api/messages/items/RequestItem.java |  28 +--
 .../java/api/messages/items/ResponseItem.java      |  28 +--
 .../api/messages/items/SubscriptionEventItem.java  |  48 +++++
 ...a => SubscriptionRequestChangeOfStateItem.java} |  11 +-
 ...tem.java => SubscriptionRequestCyclicItem.java} |  25 ++-
 ...Item.java => SubscriptionRequestEventItem.java} |  11 +-
 .../messages/items/SubscriptionRequestItem.java    |  45 +++++
 .../messages/items/SubscriptionResponseItem.java   |  37 ++++
 .../messages/items/UnsubscriptionRequestItem.java  |  62 ++++++
 .../messages/items/UnsubscriptionResponseItem.java |  29 +++
 .../java/api/messages/items/WriteRequestItem.java  |  28 +--
 .../java/api/messages/items/WriteResponseItem.java |  28 +--
 .../SubscriptionHandle.java}                       |  19 +-
 .../SubscriptionType.java}                         |  25 ++-
 .../ads/connection/AdsAbstractPlcConnection.java   |   5 +
 .../java/ads/connection/AdsTcpPlcConnection.java   | 208 ++++++++++++---------
 .../java/ads/model/AdsSubscriptionHandle.java}     |  17 +-
 .../apache/plc4x/java/ads/ManualPlc4XAdsTest.java  |  24 ++-
 .../ads/connection/AdsTcpPlcConnectionTests.java   |  46 +++--
 pom.xml                                            |   3 +
 34 files changed, 722 insertions(+), 375 deletions(-)

diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
index 08891e7..16fc626 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@ -26,14 +26,21 @@ import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.plc4x.java.api.connection.PlcConnection;
 import org.apache.plc4x.java.api.connection.PlcSubscriber;
 import org.apache.plc4x.java.api.exceptions.PlcException;
-import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.items.*;
 import org.apache.plc4x.java.api.model.Address;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
-public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util.function.Consumer<PlcNotification<Object>> {
+public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util.function.Consumer<SubscriptionEventItem> {
     private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XConsumer.class);
 
     private Plc4XEndpoint endpoint;
@@ -42,6 +49,7 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
     private PlcConnection plcConnection;
     private Address address;
     private Class<?> dataType;
+    private PlcSubscriptionResponse subscriptionResponse;
 
 
     public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException {
@@ -73,13 +81,20 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
     }
 
     @Override
-    protected void doStart() {
-        getSubscriber().subscribe(this, address, dataType);
+    protected void doStart() throws InterruptedException, ExecutionException, TimeoutException {
+        PlcSubscriptionRequest request = new PlcSubscriptionRequest();
+        request.addItem(new SubscriptionRequestCyclicItem(dataType, address, this, TimeUnit.SECONDS, 3));
+        CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = getSubscriber().subscribe(request);
+        subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
     }
 
     @Override
     protected void doStop() {
-        getSubscriber().unsubscribe(this, address);
+        PlcUnsubscriptionRequest request = new PlcUnsubscriptionRequest();
+        subscriptionResponse.getResponseItems().stream()
+            .map(SubscriptionResponseItem::getSubscriptionHandle)
+            .map(UnsubscriptionRequestItem::new)
+            .forEach(request::addItem);
         try {
             plcConnection.close();
         } catch (Exception e) {
@@ -92,11 +107,11 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
     }
 
     @Override
-    public void accept(PlcNotification<Object> plcNotification) {
-        LOGGER.debug("Received {}", plcNotification);
+    public void accept(SubscriptionEventItem subscriptionEventItem) {
+        LOGGER.debug("Received {}", subscriptionEventItem);
         try {
             Exchange exchange = endpoint.createExchange();
-            exchange.getIn().setBody(unwrapIfSingle(plcNotification.getValues()));
+            exchange.getIn().setBody(unwrapIfSingle(subscriptionEventItem.getValues()));
             processor.process(exchange);
         } catch (Exception e) {
             exceptionHandler.handleException(e);
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/ManualTest.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/ManualTest.java
index cd96f9f..9eff617 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/ManualTest.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/ManualTest.java
@@ -53,7 +53,7 @@ public class ManualTest {
     private static class MyRouteBuilder extends RouteBuilder {
         @Override
         public void configure() {
-            from("plc4x:ads:tcp://10.10.64.40/10.10.64.40.1.1:851/10.10.56.23.1.1:30000?dataType=java.lang.Integer&address=Allgemein_S2.Station")
+            from("plc4x:ads:tcp://10.10.64.40/10.10.64.40.1.1:851/192.168.113.3.1.1:30000?dataType=java.lang.Integer&address=Allgemein_S2.Station")
                 .process(exchange -> System.out.println("Invoked timer at " + new Date()))
                 .bean("foo")
                 .log("Received ${body}");
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
index dca996b..f7c00cb 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
@@ -25,18 +25,27 @@ import org.apache.plc4x.java.api.connection.PlcSubscriber;
 import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcException;
-import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
 import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
+import org.apache.plc4x.java.api.types.ResponseCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Calendar;
 import java.util.Collections;
-import java.util.Date;
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static org.mockito.Mockito.*;
 
@@ -58,6 +67,7 @@ public class MockDriver implements PlcDriver {
 
     @Override
     public PlcConnection connect(String url) {
+        // Mock a connection.
         PlcConnection plcConnectionMock = mock(PlcConnection.class, RETURNS_DEEP_STUBS);
         try {
             when(plcConnectionMock.parseAddress(anyString())).thenReturn(mock(Address.class));
@@ -65,29 +75,40 @@ public class MockDriver implements PlcDriver {
             throw new RuntimeException(e);
         }
         when(plcConnectionMock.getWriter()).thenReturn(Optional.of(mock(PlcWriter.class, RETURNS_DEEP_STUBS)));
+
+        // Mock a typical subscriber.
         PlcSubscriber plcSubscriber = mock(PlcSubscriber.class, RETURNS_DEEP_STUBS);
-        doAnswer(invocation -> {
+        when(plcSubscriber.subscribe(any())).thenAnswer(invocation -> {
             LOGGER.info("Received {}", invocation);
-            Consumer consumer = invocation.getArgument(0);
-            executorService.submit(() -> {
-                while (!Thread.currentThread().isInterrupted()) {
-                    consumer.accept(new PlcNotification(new Date(), mock(Address.class), Collections.singletonList("HelloWorld")));
-                    try {
-                        TimeUnit.MILLISECONDS.sleep(100);
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        throw new RuntimeException(e);
-                    }
-                }
-            });
-            return null;
-        }).when(plcSubscriber).subscribe(any(), any(), any());
+            PlcSubscriptionRequest subscriptionRequest = invocation.getArgument(0);
+            List<SubscriptionResponseItem<?>> responseItems =
+                subscriptionRequest.getRequestItems().stream().map(subscriptionRequestItem -> {
+                    Consumer consumer = subscriptionRequestItem.getConsumer();
+                    executorService.submit(() -> {
+                        while (!Thread.currentThread().isInterrupted()) {
+                            consumer.accept(new SubscriptionEventItem<>(null, Calendar.getInstance(), Collections.singletonList("HelloWorld")));
+                            try {
+                                TimeUnit.MILLISECONDS.sleep(100);
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    });
+                    return new SubscriptionResponseItem<>(subscriptionRequestItem,
+                        mock(SubscriptionHandle.class, RETURNS_DEEP_STUBS), ResponseCode.OK);
+                }).collect(Collectors.toList());
+            PlcSubscriptionResponse response = new PlcSubscriptionResponse(subscriptionRequest, responseItems);
+            CompletableFuture<PlcSubscriptionResponse> responseFuture = new CompletableFuture<>();
+            responseFuture.complete(response);
+            return responseFuture;
+        });
         when(plcConnectionMock.getSubscriber()).thenReturn(Optional.of(plcSubscriber));
         return plcConnectionMock;
     }
 
     @Override
-    public PlcConnection connect(String url, PlcAuthentication authentication) throws PlcConnectionException {
+    public PlcConnection connect(String url, PlcAuthentication authentication) {
         return connect(null);
     }
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
index 4296df3..cd52d48 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
@@ -18,9 +18,10 @@
  */
 package org.apache.plc4x.java.api.connection;
 
-import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.api.model.Address;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
 /**
@@ -29,21 +30,20 @@ import java.util.function.Consumer;
 public interface PlcSubscriber {
 
     /**
-     * Subscribes a {@code consumer} to a {@code address} parsing values as {@code dataType}.
-     * {@code consumer} and {@code address} are used as unique identification.
+     * Subscribes to addresses on the PLC.
      *
-     * @param consumer to be subscribed.
-     * @param address  to be read.
-     * @param dataType to be decoded.
+     * @param subscriptionRequest subscription request containing at least one subscription request item.
+     * @return subscription response containing a subscription response item for each subscription request item.
      */
-    <T extends R, R> void subscribe(Consumer<PlcNotification<R>> consumer, Address address, Class<T> dataType);
-
+    CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest);
 
     /**
-     * Unsubscribes a {@code consumer}.
-     * {@code consumer} and {@code address} are used as unique identification.
+     * Unsubscribes from addresses on the PLC. For unsubscribing the unsubscription request uses the subscription
+     * handle returned as part of the subscription response item.
      *
-     * @param consumer to be unsubscribed.
+     * @param unsubscriptionRequest unsubscription request containing at least one unsubscription request item.
+     * @return unsubscription response containing a unsubscription response item for each unsubscription request item.
      */
-    <R> void unsubscribe(Consumer<PlcNotification<R>> consumer, Address address);
+    CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest);
+
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
deleted file mode 100644
index ce68982..0000000
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.api.messages;
-
-import org.apache.plc4x.java.api.model.Address;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Objects;
-
-public class PlcNotification<T> {
-
-    protected final Date timeStamp;
-
-    protected final Address address;
-
-    protected final List<T> values;
-
-    public PlcNotification(Date timeStamp, Address address, List<T> values) {
-        this.timeStamp = timeStamp;
-        this.address = address;
-        this.values = values;
-    }
-
-    public Date getTimeStamp() {
-        return timeStamp;
-    }
-
-    public Address getAddress() {
-        return address;
-    }
-
-    public List<T> getValues() {
-        return values;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof PlcNotification)) {
-            return false;
-        }
-        PlcNotification<?> that = (PlcNotification<?>) o;
-        return Objects.equals(timeStamp, that.timeStamp) &&
-            Objects.equals(address, that.address) &&
-            Objects.equals(values, that.values);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(timeStamp, address, values);
-    }
-
-    @Override
-    public String toString() {
-        return "PlcNotification{" +
-            "timeStamp=" + timeStamp +
-            ", address=" + address +
-            ", values=" + values +
-            '}';
-    }
-}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
index 3ad5fc1..3ce851b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
@@ -101,7 +101,7 @@ public abstract class PlcRequest<REQUEST_ITEM extends RequestItem> implements Pl
     @Override
     public String toString() {
         return "PlcRequest{" +
-            "requestItems=" + requestItems +
+            "eventItems=" + requestItems +
             '}';
     }
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestContainer.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestContainer.java
index 4168246..f7259f6 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestContainer.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequestContainer.java
@@ -21,6 +21,14 @@ package org.apache.plc4x.java.api.messages;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
+/**
+ * Helper mostly used internally to split up big requests into multiple sub-requests.
+ *
+ * TODO: Think about moving this into one of the driver-base modules.
+ *
+ * @param <T>
+ * @param <R>
+ */
 public class PlcRequestContainer<T extends PlcRequest, R extends PlcResponse> implements ProtocolMessage {
 
     private final T request;
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionEvent.java
similarity index 63%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionEvent.java
index e923524..e8b1d0d 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionEvent.java
@@ -1,3 +1,9 @@
+package org.apache.plc4x.java.api.messages;
+
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+
+import java.util.List;
+
 /*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -16,14 +22,16 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
+public class PlcSubscriptionEvent {
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+    protected final List<SubscriptionEventItem<?>> eventItems;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+    public PlcSubscriptionEvent(List<SubscriptionEventItem<?>> eventItems) {
+        this.eventItems = eventItems;
+    }
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
+    public List<SubscriptionEventItem<?>> getEventItems() {
+        return eventItems;
     }
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
similarity index 70%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
index e923524..65b0659 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.messages;
 /*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -16,14 +17,9 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
-
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
-    }
+public class PlcSubscriptionRequest extends PlcRequest<SubscriptionRequestItem<?>> {
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
similarity index 59%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
index e923524..3c271df 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionResponse.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.messages;
 /*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -16,14 +17,16 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+import java.util.List;
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
+public class PlcSubscriptionResponse extends PlcResponse<PlcSubscriptionRequest, SubscriptionResponseItem<?>, SubscriptionRequestItem<?>> {
+
+    public PlcSubscriptionResponse(PlcSubscriptionRequest request, List<SubscriptionResponseItem<?>> subscriptionResponseItems) {
+        super(request, subscriptionResponseItems);
     }
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
new file mode 100644
index 0000000..94377d3
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.api.messages;
+
+import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem;
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+
+public class PlcUnsubscriptionRequest implements PlcMessage {
+
+    protected final List<UnsubscriptionRequestItem> requestItems;
+
+    public PlcUnsubscriptionRequest() {
+        this.requestItems = new LinkedList<>();
+    }
+
+    public PlcUnsubscriptionRequest(List<UnsubscriptionRequestItem> requestItems) {
+        Objects.requireNonNull(requestItems, "Request items must not be null");
+        this.requestItems = requestItems;
+    }
+
+    public void addItem(UnsubscriptionRequestItem unsubscriptionRequestItem) {
+        Objects.requireNonNull(unsubscriptionRequestItem, "Request item must not be null");
+        getRequestItems().add(unsubscriptionRequestItem);
+    }
+
+    public List<UnsubscriptionRequestItem> getRequestItems() {
+        return requestItems;
+    }
+
+    public int getNumberOfItems() {
+        return getRequestItems().size();
+    }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
similarity index 70%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
index e923524..f75415a 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.messages;
 /*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -16,14 +17,6 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
-
-import org.apache.plc4x.java.api.types.ResponseCode;
-
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
-
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
-    }
+public class PlcUnsubscriptionResponse implements PlcMessage {
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadRequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadRequestItem.java
index 222310e..bc77dc5 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadRequestItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadRequestItem.java
@@ -1,20 +1,20 @@
 /*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadResponseItem.java
index 9d5a4d6..f7736e8 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ReadResponseItem.java
@@ -1,20 +1,20 @@
 /*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
 
-  http://www.apache.org/licenses/LICENSE-2.0
+   http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/RequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/RequestItem.java
index 4df0e4e..39a3ec2 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/RequestItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/RequestItem.java
@@ -1,20 +1,20 @@
 /*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
 
-  http://www.apache.org/licenses/LICENSE-2.0
+   http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ResponseItem.java
index 34f61b1..5cb6d9b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/ResponseItem.java
@@ -1,20 +1,20 @@
 /*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
 
-  http://www.apache.org/licenses/LICENSE-2.0
+   http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java
new file mode 100644
index 0000000..49a0200
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionEventItem.java
@@ -0,0 +1,48 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import java.util.Calendar;
+import java.util.List;
+
+public class SubscriptionEventItem<T> {
+
+    private SubscriptionRequestItem<T> subscriptionRequestItem;
+    private Calendar timestamp;
+    private List<T> values;
+
+    public SubscriptionEventItem(SubscriptionRequestItem<T> subscriptionRequestItem, Calendar timestamp, List<T> values) {
+        this.subscriptionRequestItem = subscriptionRequestItem;
+        this.timestamp = timestamp;
+        this.values = values;
+    }
+
+    public SubscriptionRequestItem<T> getSubscriptionRequestItem() {
+        return subscriptionRequestItem;
+    }
+
+    public Calendar getTimestamp() {
+        return timestamp;
+    }
+
+    public List<T> getValues() {
+        return values;
+    }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
similarity index 66%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
index e923524..57db70f 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java
@@ -18,12 +18,15 @@ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionType;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+import java.util.function.Consumer;
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
+public class SubscriptionRequestChangeOfStateItem extends SubscriptionRequestItem {
+
+    public SubscriptionRequestChangeOfStateItem(Class datatype, Address address, Consumer consumer) {
+        super(datatype, address, SubscriptionType.CHANGE_OF_STATE, consumer);
     }
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
similarity index 54%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
index e923524..b1a491b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java
@@ -18,12 +18,29 @@ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionType;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
+public class SubscriptionRequestCyclicItem extends SubscriptionRequestItem {
+
+    private TimeUnit timeUnit;
+    private int period;
+
+    public SubscriptionRequestCyclicItem(Class datatype, Address address, Consumer consumer, TimeUnit timeUnit, int period) {
+        super(datatype, address, SubscriptionType.CYCLIC, consumer);
+        this.timeUnit = timeUnit;
+        this.period = period;
+    }
+
+    public TimeUnit getTimeUnit() {
+        return timeUnit;
+    }
+
+    public int getPeriod() {
+        return period;
     }
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
similarity index 68%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
index e923524..8ed69c5 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java
@@ -18,12 +18,15 @@ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionType;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+import java.util.function.Consumer;
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
+public class SubscriptionRequestEventItem extends SubscriptionRequestItem {
+
+    public SubscriptionRequestEventItem(Class datatype, Address address, Consumer consumer) {
+        super(datatype, address, SubscriptionType.EVENT, consumer);
     }
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java
new file mode 100644
index 0000000..7591e09
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestItem.java
@@ -0,0 +1,45 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.model.SubscriptionType;
+
+import java.util.function.Consumer;
+
+public abstract class SubscriptionRequestItem<T> extends RequestItem<T> {
+
+    private SubscriptionType subscriptionType;
+    private Consumer<SubscriptionEventItem<T>> consumer;
+
+    public SubscriptionRequestItem(Class<T> datatype, Address address, SubscriptionType subscriptionType, Consumer<SubscriptionEventItem<T>> consumer) {
+        super(datatype, address);
+        this.subscriptionType = subscriptionType;
+        this.consumer = consumer;
+    }
+
+    public SubscriptionType getSubscriptionType() {
+        return subscriptionType;
+    }
+
+    public Consumer<SubscriptionEventItem<T>> getConsumer() {
+        return consumer;
+    }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionResponseItem.java
new file mode 100644
index 0000000..c38e28b
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionResponseItem.java
@@ -0,0 +1,37 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
+import org.apache.plc4x.java.api.types.ResponseCode;
+
+public class SubscriptionResponseItem<T> extends ResponseItem<SubscriptionRequestItem<T>>  {
+
+    private SubscriptionHandle subscriptionHandle;
+
+    public SubscriptionResponseItem(SubscriptionRequestItem<T> requestItem, SubscriptionHandle subscriptionHandle, ResponseCode responseCode) {
+        super(requestItem, responseCode);
+        this.subscriptionHandle = subscriptionHandle;
+    }
+
+    public SubscriptionHandle getSubscriptionHandle() {
+        return subscriptionHandle;
+    }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionRequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionRequestItem.java
new file mode 100644
index 0000000..d3312f3
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionRequestItem.java
@@ -0,0 +1,62 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
+
+import java.util.Objects;
+
+public class UnsubscriptionRequestItem {
+
+    private SubscriptionHandle subscriptionHandle;
+
+    public UnsubscriptionRequestItem(SubscriptionHandle subscriptionHandle) {
+        Objects.requireNonNull(subscriptionHandle, "SubscriptionHandle must not be null");
+        this.subscriptionHandle = subscriptionHandle;
+    }
+
+    public SubscriptionHandle getSubscriptionHandle() {
+        return subscriptionHandle;
+    }
+
+    @Override
+    public String toString() {
+        return "UnsubscriptionRequestItem{" +
+            "subscriptionHandle=" + subscriptionHandle +
+            '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof UnsubscriptionRequestItem)) {
+            return false;
+        }
+        UnsubscriptionRequestItem that = (UnsubscriptionRequestItem) o;
+        return Objects.equals(subscriptionHandle, that.subscriptionHandle);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(subscriptionHandle);
+    }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionResponseItem.java
new file mode 100644
index 0000000..8a7c659
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/UnsubscriptionResponseItem.java
@@ -0,0 +1,29 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+package org.apache.plc4x.java.api.messages.items;
+
+import org.apache.plc4x.java.api.types.ResponseCode;
+
+public class UnsubscriptionResponseItem {
+
+    public UnsubscriptionResponseItem(UnsubscriptionRequestItem requestItem, ResponseCode responseCode) {
+
+    }
+
+}
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteRequestItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteRequestItem.java
index 3acbef5..9aba923 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteRequestItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteRequestItem.java
@@ -1,20 +1,20 @@
 /*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
 
-  http://www.apache.org/licenses/LICENSE-2.0
+   http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
index e923524..9928b7d 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
@@ -1,20 +1,20 @@
 /*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
 
-  http://www.apache.org/licenses/LICENSE-2.0
+   http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
 */
 package org.apache.plc4x.java.api.messages.items;
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionHandle.java
similarity index 59%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionHandle.java
index e923524..445d9e9 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionHandle.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.model;
 /*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -16,14 +17,16 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
-
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
-
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
-    }
+/**
+ * When subscribing to remote resources, depending on the used protocol
+ * different data is used to identify a subscription. This interface is
+ * to be implemented in the individual Driver implementations to contain
+ * all information needed to pull or unsubscribe any form of subscription.
+ *
+ * For every subscribed item, a separate {@link SubscriptionHandle} object is
+ * returned in order to allow fine granular unsubscriptions.
+ */
+public interface SubscriptionHandle {
 
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionType.java
similarity index 57%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionType.java
index e923524..287420b 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/SubscriptionType.java
@@ -1,3 +1,4 @@
+package org.apache.plc4x.java.api.model;
 /*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -16,14 +17,26 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+/**
+ * {@link SubscriptionType} specifies the nature of the subscription.
+ * In general PLC4X supports exactly 3 types of subscriptions.
+ */
+public enum SubscriptionType {
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+    /**
+     * A cyclic subscription where a value is sent no matter if it's value changed in a given interval.
+     */
+    CYCLIC,
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
-    }
+    /**
+     * Only send data, if a value in the PLC changed.
+     */
+    CHANGE_OF_STATE,
+
+    /**
+     * Subscribe to events created by the PLC which usually are defined in the PLCs application (Alarms).
+     */
+    EVENT
 
 }
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
index 2304199..cf1b8d9 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
@@ -149,6 +149,8 @@ public abstract class AdsAbstractPlcConnection extends AbstractPlcConnection imp
     }
 
     protected void mapAddress(SymbolicAdsAddress symbolicAdsAddress) {
+        // If the map doesn't contain an entry for the given symbolicAdsAddress,
+        // resolve it and add it to the map.
         addressMapping.computeIfAbsent(symbolicAdsAddress, symbolicAdsAddressInternal -> {
             LOGGER.debug("Resolving {}", symbolicAdsAddressInternal);
             AdsReadWriteRequest adsReadWriteRequest = AdsReadWriteRequest.of(
@@ -163,13 +165,16 @@ public abstract class AdsAbstractPlcConnection extends AbstractPlcConnection imp
                 Data.of(symbolicAdsAddressInternal.getSymbolicAddress())
             );
 
+            // TODO: This is blocking, should be changed to be async.
             CompletableFuture<PlcProprietaryResponse<AdsReadWriteResponse>> getHandelFuture = new CompletableFuture<>();
             channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsReadWriteRequest), getHandelFuture));
             PlcProprietaryResponse<AdsReadWriteResponse> getHandleResponse = getFromFuture(getHandelFuture, SYMBOL_RESOLVE_TIMEOUT);
             AdsReadWriteResponse response = getHandleResponse.getResponse();
+
             if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
                 throw new PlcRuntimeException("Non error code received " + response.getResult());
             }
+
             IndexOffset symbolHandle = IndexOffset.of(response.getData().getBytes());
             return AdsAddress.of(IndexGroup.ReservedGroups.ADSIGRP_SYM_VALBYHND.getAsLong(), symbolHandle.getAsLong());
         });
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 4a04555..124d8d6 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -22,13 +22,13 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.ads.api.commands.*;
 import org.apache.plc4x.java.ads.api.commands.types.*;
 import org.apache.plc4x.java.ads.api.generic.types.AmsNetId;
 import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
 import org.apache.plc4x.java.ads.api.generic.types.Invoke;
 import org.apache.plc4x.java.ads.model.AdsAddress;
+import org.apache.plc4x.java.ads.model.AdsSubscriptionHandle;
 import org.apache.plc4x.java.ads.model.SymbolicAdsAddress;
 import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
 import org.apache.plc4x.java.ads.protocol.Payload2TcpProtocol;
@@ -37,11 +37,13 @@ import org.apache.plc4x.java.ads.protocol.util.LittleEndianDecoder;
 import org.apache.plc4x.java.api.connection.PlcSubscriber;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcNotification;
-import org.apache.plc4x.java.api.messages.PlcProprietaryRequest;
-import org.apache.plc4x.java.api.messages.PlcProprietaryResponse;
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
+import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem;
 import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.types.ResponseCode;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,10 +67,6 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
 
     private static AtomicInteger localPorts = new AtomicInteger(30000);
 
-    private final Map<Pair<Consumer<? extends PlcNotification>, Address>, Pair<Consumer<AdsDeviceNotificationRequest>, NotificationHandle>> subscriberMap = new HashMap<>();
-
-    private final Map<NotificationHandle, Consumer<? extends PlcNotification>> handleConsumerMap = new HashMap<>();
-
     private AdsTcpPlcConnection(InetAddress address, AmsNetId targetAmsNetId, AmsPort targetAmsPort) {
         this(address, targetAmsNetId, targetAmsPort, generateAMSNetId(), generateAMSPort());
     }
@@ -132,99 +130,129 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
     }
 
     @Override
-    public <T extends R, R> void subscribe(Consumer<PlcNotification<R>> consumer, Address address, Class<T> dataType) {
-        Objects.requireNonNull(consumer);
-        Objects.requireNonNull(address);
-        IndexGroup indexGroup;
-        IndexOffset indexOffset;
-        if (address instanceof SymbolicAdsAddress) {
-            mapAddress((SymbolicAdsAddress) address);
-            AdsAddress adsAddress = addressMapping.get(address);
-            if (adsAddress == null) {
-                throw new PlcRuntimeException("Unresolvable address" + address);
+    public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+        // TODO: Make this multi-value
+        CompletableFuture<PlcSubscriptionResponse> future = new CompletableFuture<>();
+        if(subscriptionRequest.getNumberOfItems() == 1) {
+            SubscriptionRequestItem<?> subscriptionRequestItem = subscriptionRequest.getRequestItem().orElse(null);
+
+            Objects.requireNonNull(subscriptionRequestItem);
+            Objects.requireNonNull(subscriptionRequestItem.getConsumer());
+            Objects.requireNonNull(subscriptionRequestItem.getAddress());
+            Objects.requireNonNull(subscriptionRequestItem.getDatatype());
+
+            Address address = subscriptionRequestItem.getAddress();
+            Class<?> datatype = subscriptionRequestItem.getDatatype();
+
+            IndexGroup indexGroup;
+            IndexOffset indexOffset;
+            // If this is a symbolic address, it has to be resolved first.
+            // TODO: This is blocking, should be changed to be async.
+            if (address instanceof SymbolicAdsAddress) {
+                mapAddress((SymbolicAdsAddress) address);
+                AdsAddress adsAddress = addressMapping.get(address);
+                if (adsAddress == null) {
+                    throw new PlcRuntimeException("Unresolvable address" + address);
+                }
+                indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
+                indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
+            }
+            // If it's no symbolic address, we can continue immediately
+            // without having to do any resolving.
+            else if (address instanceof AdsAddress) {
+                AdsAddress adsAddress = (AdsAddress) address;
+                indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
+                indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
+            } else {
+                throw new IllegalArgumentException("Unsupported address type " + address.getClass());
             }
-            indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
-            indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
-        } else if (address instanceof AdsAddress) {
-            AdsAddress adsAddress = (AdsAddress) address;
-            indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
-            indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
-        } else {
-            throw new IllegalArgumentException("Unssuported address type " + address.getClass());
-        }
-        AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = AdsAddDeviceNotificationRequest.of(
-            targetAmsNetId,
-            targetAmsPort,
-            sourceAmsNetId,
-            sourceAmsPort,
-            Invoke.NONE,
-            indexGroup,
-            indexOffset,
-            LittleEndianDecoder.getLengthFor(dataType, 1),
-            TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE,
-            MaxDelay.of(0),
-            CycleTime.of(4000000)
-        );
-
-        CompletableFuture<PlcProprietaryResponse<AdsAddDeviceNotificationResponse>> addDeviceFuture = new CompletableFuture<>();
-        channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsAddDeviceNotificationRequest), addDeviceFuture));
-        PlcProprietaryResponse<AdsAddDeviceNotificationResponse> addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT);
-        AdsAddDeviceNotificationResponse response = addDeviceResponse.getResponse();
-        if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
-            throw new PlcRuntimeException("Non error code received " + response.getResult());
-        }
-        NotificationHandle notificationHandle = response.getNotificationHandle();
-        handleConsumerMap.put(notificationHandle, consumer);
-
-        Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer =
-            adsDeviceNotificationRequest -> adsDeviceNotificationRequest.getAdsStampHeaders().forEach(adsStampHeader -> {
-                Date timeStamp = adsStampHeader.getTimeStamp().getAsDate();
-
-                adsStampHeader.getAdsNotificationSamples()
-                    .forEach(adsNotificationSample -> {
-                        Consumer<? extends PlcNotification> plcNotificationConsumer = handleConsumerMap.get(adsNotificationSample.getNotificationHandle());
-                        if (plcNotificationConsumer == null) {
-                            LOGGER.warn("Unmapped notification received {}", adsNotificationSample);
-                            return;
-                        }
-                        Data data = adsNotificationSample.getData();
-                        try {
-                            @SuppressWarnings("unchecked")
-                            List<R> decodeData = (List<R>) LittleEndianDecoder.decodeData(dataType, data.getBytes());
-                            consumer.accept(new PlcNotification<>(timeStamp, address, decodeData));
-                        } catch (PlcProtocolException | RuntimeException e) {
-                            LOGGER.error("Can't decode {}", data, e);
-                        }
-                    });
-            });
-        subscriberMap.put(Pair.of(consumer, address), Pair.of(adsDeviceNotificationRequestConsumer, notificationHandle));
-        getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer);
-    }
 
-    @Override
-    public <R> void unsubscribe(Consumer<PlcNotification<R>> consumer, Address address) {
-        Pair<Consumer<AdsDeviceNotificationRequest>, NotificationHandle> handlePair = subscriberMap.remove(Pair.of(consumer, address));
-        if (handlePair != null) {
-            NotificationHandle notificationHandle = handlePair.getRight();
-            AdsDeleteDeviceNotificationRequest adsDeleteDeviceNotificationRequest = AdsDeleteDeviceNotificationRequest.of(
+            // Prepare the subscription request itself.
+            AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = AdsAddDeviceNotificationRequest.of(
                 targetAmsNetId,
                 targetAmsPort,
                 sourceAmsNetId,
                 sourceAmsPort,
                 Invoke.NONE,
-                notificationHandle
+                indexGroup,
+                indexOffset,
+                LittleEndianDecoder.getLengthFor(datatype, 1),
+                TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE,
+                MaxDelay.of(0),
+                CycleTime.of(4000000)
             );
-            CompletableFuture<PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse>> deleteDeviceFuture = new CompletableFuture<>();
-            channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsDeleteDeviceNotificationRequest), deleteDeviceFuture));
 
-            PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse> deleteDeviceResponse = getFromFuture(deleteDeviceFuture, DEL_DEVICE_TIMEOUT);
-            AdsDeleteDeviceNotificationResponse response = deleteDeviceResponse.getResponse();
+            // Send the request to the plc and wait for a response
+            // TODO: This is blocking, should be changed to be async.
+            CompletableFuture<PlcProprietaryResponse<AdsAddDeviceNotificationResponse>> addDeviceFuture = new CompletableFuture<>();
+            channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsAddDeviceNotificationRequest), addDeviceFuture));
+            PlcProprietaryResponse<AdsAddDeviceNotificationResponse> addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT);
+            AdsAddDeviceNotificationResponse response = addDeviceResponse.getResponse();
+
+            // Abort if we got anything but a successful response.
             if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
-                throw new PlcRuntimeException("Non error code received " + response.getResult());
+                throw new PlcRuntimeException("Error code received " + response.getResult());
             }
+            AdsSubscriptionHandle adsSubscriptionHandle = new AdsSubscriptionHandle(response.getNotificationHandle());
+            future.complete(new PlcSubscriptionResponse(subscriptionRequest, Collections.singletonList(
+                new SubscriptionResponseItem<>(subscriptionRequestItem, adsSubscriptionHandle, ResponseCode.OK))));
+
+            Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer =
+                adsDeviceNotificationRequest -> adsDeviceNotificationRequest.getAdsStampHeaders().forEach(adsStampHeader -> {
+                    Calendar timeStamp = Calendar.getInstance();
+                    timeStamp.setTime(adsStampHeader.getTimeStamp().getAsDate());
 
-            getChannel().pipeline().get(Plc4x2AdsProtocol.class).removeConsumer(handlePair.getLeft());
-            handleConsumerMap.remove(notificationHandle);
+                    adsStampHeader.getAdsNotificationSamples()
+                        .forEach(adsNotificationSample -> {
+                            Data data = adsNotificationSample.getData();
+                            try {
+                                @SuppressWarnings("unchecked")
+                                List<?> decodeData = LittleEndianDecoder.decodeData(datatype, data.getBytes());
+                                SubscriptionEventItem subscriptionEventItem =
+                                    new SubscriptionEventItem(subscriptionRequestItem, timeStamp, decodeData);
+                                subscriptionRequestItem.getConsumer().accept(subscriptionEventItem);
+                            } catch (PlcProtocolException | RuntimeException e) {
+                                LOGGER.error("Can't decode {}", data, e);
+                            }
+                        });
+                });
+            // TODO: What's this for? Is this still needed if we use the consumers in the subscriptions?
+            getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer);
+        }
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
+        for (UnsubscriptionRequestItem unsubscriptionRequestItem : unsubscriptionRequest.getRequestItems()) {
+            Objects.requireNonNull(unsubscriptionRequestItem);
+            if(unsubscriptionRequestItem.getSubscriptionHandle() instanceof AdsSubscriptionHandle) {
+                AdsSubscriptionHandle adsSubscriptionHandle =
+                    (AdsSubscriptionHandle) unsubscriptionRequestItem.getSubscriptionHandle();
+                AdsDeleteDeviceNotificationRequest adsDeleteDeviceNotificationRequest =
+                    AdsDeleteDeviceNotificationRequest.of(
+                        targetAmsNetId,
+                        targetAmsPort,
+                        sourceAmsNetId,
+                        sourceAmsPort,
+                        Invoke.NONE,
+                        adsSubscriptionHandle.getNotificationHandle()
+                    );
+                CompletableFuture<PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse>> deleteDeviceFuture =
+                    new CompletableFuture<>();
+                channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(
+                    adsDeleteDeviceNotificationRequest), deleteDeviceFuture));
+
+                PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse> deleteDeviceResponse =
+                    getFromFuture(deleteDeviceFuture, DEL_DEVICE_TIMEOUT);
+                AdsDeleteDeviceNotificationResponse response = deleteDeviceResponse.getResponse();
+                if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
+                    throw new PlcRuntimeException("Non error code received " + response.getResult());
+                }
+            }
         }
+        CompletableFuture<PlcUnsubscriptionResponse> future = new CompletableFuture<>();
+        future.complete(new PlcUnsubscriptionResponse());
+        return future;
     }
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
similarity index 59%
copy from plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
copy to plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
index e923524..2850fa9 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/WriteResponseItem.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
@@ -16,14 +16,21 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.java.api.messages.items;
+package org.apache.plc4x.java.ads.model;
 
-import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.java.ads.api.commands.types.NotificationHandle;
+import org.apache.plc4x.java.api.model.SubscriptionHandle;
 
-public class WriteResponseItem<T> extends ResponseItem<WriteRequestItem<T>> {
+public class AdsSubscriptionHandle implements SubscriptionHandle {
 
-    public WriteResponseItem(WriteRequestItem<T> requestItem, ResponseCode responseCode) {
-        super(requestItem, responseCode);
+    private NotificationHandle notificationHandle;
+
+    public AdsSubscriptionHandle(NotificationHandle notificationHandle) {
+        this.notificationHandle = notificationHandle;
+    }
+
+    public NotificationHandle getNotificationHandle() {
+        return notificationHandle;
     }
 
 }
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
index 7d8566e..3511e32 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
@@ -22,8 +22,8 @@ import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
 import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.connection.PlcSubscriber;
-import org.apache.plc4x.java.api.messages.PlcNotification;
-import org.apache.plc4x.java.api.messages.items.ReadResponseItem;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.items.*;
 import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
 import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
 import org.apache.plc4x.java.api.model.Address;
@@ -41,7 +41,7 @@ public class ManualPlc4XAdsTest {
             connectionUrl = "ads:serial:///dev/ttys003/10.10.64.40.1.1:851/10.10.56.23.1.1:30000";
         } else {
             System.out.println("Using tcp");
-            connectionUrl = "ads:tcp://10.10.64.40/10.10.64.40.1.1:851/10.10.56.23.1.1:30000";
+            connectionUrl = "ads:tcp://10.10.64.40/10.10.64.40.1.1:851/192.168.113.1.1.1:30000";
         }
         try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionUrl)) {
             System.out.println("PlcConnection " + plcConnection);
@@ -57,11 +57,21 @@ public class ManualPlc4XAdsTest {
             System.out.println("ResponseItem " + responseItem);
             responseItem.getValues().stream().map(integer -> "Value: " + integer).forEach(System.out::println);
 
-            Consumer<PlcNotification<Integer>> notificationConsumer = plcNotification -> System.out.println("Received notification " + plcNotification);
+            Consumer<SubscriptionEventItem<Integer>> notificationConsumer = plcNotification -> System.out.println("Received notification " + plcNotification);
             PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("Subscribe not available"));
-            plcSubscriber.subscribe(notificationConsumer, address, Integer.class);
-            TimeUnit.SECONDS.sleep(5);
-            plcSubscriber.unsubscribe(notificationConsumer, address);
+            PlcSubscriptionRequest subscriptionRequest = new PlcSubscriptionRequest();
+            subscriptionRequest.addItem(new SubscriptionRequestChangeOfStateItem(Integer.class, address, notificationConsumer));
+            CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = plcSubscriber.subscribe(subscriptionRequest);
+            PlcSubscriptionResponse subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
+            SubscriptionResponseItem subscriptionResponseItem = subscriptionResponse.getResponseItem().get();
+
+            PlcUnsubscriptionRequest unsubscriptionRequest = new PlcUnsubscriptionRequest();
+            unsubscriptionRequest.addItem(
+                new UnsubscriptionRequestItem(subscriptionResponseItem.getSubscriptionHandle()));
+            CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionFuture =
+                plcSubscriber.unsubscribe(unsubscriptionRequest);
+            PlcUnsubscriptionResponse unsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS);
+            System.out.println(unsubscriptionResponse);
         }
         System.exit(0);
     }
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
index a92a65e..74fbca0 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
@@ -29,10 +29,11 @@ import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
 import org.apache.plc4x.java.ads.model.AdsAddress;
 import org.apache.plc4x.java.ads.model.SymbolicAdsAddress;
 import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
-import org.apache.plc4x.java.api.messages.PlcNotification;
-import org.apache.plc4x.java.api.messages.PlcProprietaryRequest;
-import org.apache.plc4x.java.api.messages.PlcProprietaryResponse;
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestChangeOfStateItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem;
+import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,9 +50,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
+import static org.hamcrest.core.IsNull.notNullValue;
 import static org.junit.Assert.*;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.*;
+import static org.hamcrest.core.IsEqual.equalTo;
 
 public class AdsTcpPlcConnectionTests {
 
@@ -66,6 +69,7 @@ public class AdsTcpPlcConnectionTests {
     @Before
     public void setUp() throws Exception {
         SUT = AdsTcpPlcConnection.of(InetAddress.getByName("localhost"), AmsNetId.of("0.0.0.0.0.0"), AmsPort.of(13));
+        // TODO: Refactor this to use the TestChannelFactory instead.
         channelMock = mock(Channel.class, RETURNS_DEEP_STUBS);
         FieldUtils.writeField(SUT, "channel", channelMock, true);
         executorService = Executors.newFixedThreadPool(10);
@@ -115,6 +119,7 @@ public class AdsTcpPlcConnectionTests {
 
     @Test
     public void subscribe() throws Exception {
+        // TODO: Does this really test the driver implementation?
         when(channelMock.writeAndFlush(any(PlcRequestContainer.class)))
             .then(invocationOnMock -> {
                 PlcRequestContainer plcRequestContainer = invocationOnMock.getArgument(0);
@@ -158,19 +163,30 @@ public class AdsTcpPlcConnectionTests {
         when(channelMock.pipeline().get(Plc4x2AdsProtocol.class)).thenReturn(plc4x2AdsProtocol);
 
         CompletableFuture<?> notificationReceived = new CompletableFuture<>();
-        Consumer<PlcNotification<String>> plcNotificationConsumer = plcNotification -> {
+        Consumer<SubscriptionEventItem<String>> plcNotificationConsumer = plcNotification -> {
             LOGGER.info("Received {}", plcNotification);
             notificationReceived.complete(null);
         };
-        SUT.subscribe(plcNotificationConsumer, SUT.parseAddress("0/0"), String.class);
-        SUT.subscribe(plcNotificationConsumer, SUT.parseAddress("Main.by[0]"), String.class);
-        notificationReceived.get(3, TimeUnit.SECONDS);
-    }
-
-    @Test
-    public void unsubscribe() {
-        Consumer<PlcNotification<String>> plcNotificationConsumer = plcNotification -> {
-        };
-        SUT.unsubscribe(plcNotificationConsumer, SUT.parseAddress("0/0"));
+        PlcSubscriptionRequest subscriptionRequest = new PlcSubscriptionRequest();
+        subscriptionRequest.addItem(new SubscriptionRequestChangeOfStateItem(
+            String.class, SUT.parseAddress("0/0"), plcNotificationConsumer));
+        /*subscriptionRequest.addItem(new SubscriptionRequestItem<>(
+            String.class, SUT.parseAddress("Main.by[0]"), plcNotificationConsumer));*/
+        CompletableFuture<? extends PlcSubscriptionResponse> subscriptionFuture = SUT.subscribe(subscriptionRequest);
+        PlcSubscriptionResponse subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS);
+        //notificationReceived.get(3, TimeUnit.SECONDS);
+        assertThat(subscriptionResponse, notNullValue());
+        assertThat(subscriptionResponse.getNumberOfItems(), equalTo(1));
+
+        // Now unsubscribe again ...
+
+        // TODO: Setup the mock to actually perform the unsubscription.
+        /*PlcUnsubscriptionRequest unsubscriptionRequest = new PlcUnsubscriptionRequest();
+        for (SubscriptionResponseItem<?> subscriptionResponseItem : subscriptionResponse.getResponseItems()) {
+            unsubscriptionRequest.addItem(subscriptionResponseItem.getSubscriptionHandle());
+        }
+        CompletableFuture<? extends PlcUnsubscriptionResponse> unsubscriptionFuture = SUT.unsubscribe(unsubscriptionRequest);
+        PlcUnsubscriptionResponse plcUnsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS);
+        assertThat(plcUnsubscriptionResponse, notNullValue());*/
     }
 }
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index e2796f4..ab4629a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -316,6 +316,9 @@
 
             <!-- Jenkins build related files -->
             <exclude>.repository/**</exclude>
+
+            <!-- Data files created by examples running an embedded elasticsearch -->
+            <exclude>elasticsearch-data/**</exclude>
           </excludes>
         </configuration>
       </plugin>