You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/10/18 13:20:23 UTC

[incubator-plc4x] 01/02: [GENERAL] updated subscription api after big refactorings

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 514719c971214bd660440045cc900f0690f1a9e6
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Oct 18 15:08:20 2018 +0200

    [GENERAL] updated subscription api after big refactorings
---
 .../api/messages/PlcUnsubscriptionRequest.java     |  4 +-
 .../api/messages/PlcUnsubscriptionResponse.java    |  2 +-
 .../java/api/model/PlcConsumerRegistration.java    |  1 +
 .../java/api/model/PlcSubscriptionHandle.java      |  5 ++
 .../java/ads/connection/AdsTcpPlcConnection.java   | 30 +++++-----
 .../java/ads/model/AdsSubscriptionHandle.java      |  8 ++-
 .../plc4x/java/ads/protocol/Plc4x2AdsProtocol.java |  5 +-
 .../apache/plc4x/java/ads/ManualPlc4XAdsTest.java  | 40 +++++++++----
 .../messages/DefaultPlcSubscriptionRequest.java    | 65 +++++++++++++---------
 .../messages/DefaultPlcUnsubscriptionRequest.java  | 35 ++----------
 .../messages/DefaultPlcUnsubscriptionResponse.java | 19 -------
 .../messages/InternalPlcSubscriptionRequest.java   |  7 ++-
 .../messages/InternalPlcUnsubscriptionRequest.java |  2 +-
 .../plc4x/java/base/messages/PlcSubscriber.java    |  2 -
 .../base/model/DefaultPlcConsumerRegistration.java | 14 ++++-
 ...tion.java => DefaultPlcSubscriptionHandle.java} | 39 ++++++-------
 .../java/base/model/SubscriptionPlcField.java      | 55 ++++++++++++++++++
 .../SingleItemToSingleRequestProtocolTest.java     | 10 ++--
 18 files changed, 197 insertions(+), 146 deletions(-)

diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
index 8b95f44..74caccd 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java
@@ -23,10 +23,10 @@ import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
-public interface PlcUnsubscriptionRequest extends PlcFieldRequest {
+public interface PlcUnsubscriptionRequest extends PlcRequest {
 
     @Override
-    CompletableFuture<? extends PlcUnsubscriptionResponse> execute();
+    CompletableFuture<PlcUnsubscriptionResponse> execute();
 
     interface Builder extends PlcRequestBuilder {
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
index 4205e8b..705ee8d 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionResponse.java
@@ -18,7 +18,7 @@ under the License.
 */
 package org.apache.plc4x.java.api.messages;
 
-public interface PlcUnsubscriptionResponse extends PlcFieldResponse {
+public interface PlcUnsubscriptionResponse extends PlcResponse {
 
     @Override
     PlcUnsubscriptionRequest getRequest();
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcConsumerRegistration.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcConsumerRegistration.java
index 0f3e65b..a0a8c52 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcConsumerRegistration.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcConsumerRegistration.java
@@ -20,4 +20,5 @@
 package org.apache.plc4x.java.api.model;
 
 public interface PlcConsumerRegistration {
+    void unregister();
 }
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcSubscriptionHandle.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcSubscriptionHandle.java
index d295e9e..a9338ed 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcSubscriptionHandle.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/model/PlcSubscriptionHandle.java
@@ -18,6 +18,10 @@ under the License.
 */
 package org.apache.plc4x.java.api.model;
 
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+
+import java.util.function.Consumer;
+
 /**
  * When subscribing to remote resources, depending on the used protocol
  * different data is used to identify a subscription. This interface is
@@ -29,4 +33,5 @@ package org.apache.plc4x.java.api.model;
  */
 public interface PlcSubscriptionHandle {
 
+    PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer);
 }
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 406557f..d4b2364 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -32,7 +32,6 @@ import org.apache.plc4x.java.ads.model.*;
 import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
 import org.apache.plc4x.java.ads.protocol.Payload2TcpProtocol;
 import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
-import org.apache.plc4x.java.base.messages.PlcSubscriber;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcNotImplementedException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
@@ -43,8 +42,10 @@ import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
 import org.apache.plc4x.java.base.messages.*;
+import org.apache.plc4x.java.base.model.DefaultPlcConsumerRegistration;
 import org.apache.plc4x.java.base.model.InternalPlcConsumerRegistration;
 import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
+import org.apache.plc4x.java.base.model.SubscriptionPlcField;
 import org.apache.plc4x.java.base.protocol.SingleItemToSingleRequestProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,10 +53,8 @@ import org.slf4j.LoggerFactory;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
@@ -146,7 +145,8 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
             throw new PlcNotImplementedException("Multirequest on subscribe not implemented yet");
         }
 
-        PlcField field = internalPlcSubscriptionRequest.getFields().get(0);
+        SubscriptionPlcField subscriptionPlcField = internalPlcSubscriptionRequest.getSubscriptionFields().get(0);
+        PlcField field = subscriptionPlcField.getPlcField();
 
         IndexGroup indexGroup;
         IndexOffset indexOffset;
@@ -178,15 +178,17 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
         }
 
         final TransmissionMode transmissionMode;
-        switch (internalPlcSubscriptionRequest.getPlcSubscriptionType()) {
+        long cycleTime = 4000000;
+        switch (subscriptionPlcField.getPlcSubscriptionType()) {
             case CYCLIC:
                 transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE;
+                cycleTime = subscriptionPlcField.getDuration().orElseThrow(IllegalStateException::new).get(ChronoUnit.MILLIS);
                 break;
             case CHANGE_OF_STATE:
                 transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERONCHA;
                 break;
             default:
-                throw new PlcRuntimeException("Unmapped type " + internalPlcSubscriptionRequest.getPlcSubscriptionType());
+                throw new PlcRuntimeException("Unmapped type " + subscriptionPlcField.getPlcSubscriptionType());
         }
 
         // Prepare the subscription request itself.
@@ -201,7 +203,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
             Length.of(adsDataType.getTargetByteSize() * numberOfElements),
             transmissionMode,
             MaxDelay.of(0),
-            CycleTime.of(4000000)
+            CycleTime.of(cycleTime)
         );
 
         // Send the request to the plc and wait for a response
@@ -215,7 +217,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
         if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
             throw new PlcRuntimeException("Error code received " + response.getResult());
         }
-        AdsSubscriptionHandle adsSubscriptionHandle = new AdsSubscriptionHandle(response.getNotificationHandle());
+        AdsSubscriptionHandle adsSubscriptionHandle = new AdsSubscriptionHandle(this, response.getNotificationHandle());
 
         Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> responseItems = internalPlcSubscriptionRequest.getFieldNames()
             .stream()
@@ -262,11 +264,9 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
 
     @Override
     public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
-        return register(consumer, handles);
+        return register(consumer, handles.toArray(new PlcSubscriptionHandle[0]));
     }
 
-    // TODO: figure out what this is
-    /*@Override
     public InternalPlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, PlcSubscriptionHandle... handles) {
         Objects.requireNonNull(consumer);
         Objects.requireNonNull(handles);
@@ -275,7 +275,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
             internalPlcSubscriptionHandles[i] = checkInternal(handles[i], InternalPlcSubscriptionHandle.class);
         }
 
-        InternalPlcConsumerRegistration internalPlcConsumerRegistration = new DefaultPlcConsumerRegistration(consumer, internalPlcSubscriptionHandles);
+        InternalPlcConsumerRegistration internalPlcConsumerRegistration = new DefaultPlcConsumerRegistration(this, consumer, internalPlcSubscriptionHandles);
 
         Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer =
             adsDeviceNotificationRequest -> adsDeviceNotificationRequest.getAdsStampHeaders().forEach(adsStampHeader -> {
@@ -300,7 +300,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
         getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer);
 
         return internalPlcConsumerRegistration;
-    }*/
+    }
 
     @Override
     public void unregister(PlcConsumerRegistration plcConsumerRegistration) {
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
index 31b90ea..d7927cb 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/model/AdsSubscriptionHandle.java
@@ -19,15 +19,17 @@ under the License.
 package org.apache.plc4x.java.ads.model;
 
 import org.apache.plc4x.java.ads.api.commands.types.NotificationHandle;
-import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
+import org.apache.plc4x.java.base.messages.PlcSubscriber;
+import org.apache.plc4x.java.base.model.DefaultPlcSubscriptionHandle;
 
 import java.util.Objects;
 
-public class AdsSubscriptionHandle implements InternalPlcSubscriptionHandle {
+public class AdsSubscriptionHandle extends DefaultPlcSubscriptionHandle {
 
     private NotificationHandle notificationHandle;
 
-    public AdsSubscriptionHandle(NotificationHandle notificationHandle) {
+    public AdsSubscriptionHandle(PlcSubscriber plcSubscriber, NotificationHandle notificationHandle) {
+        super(plcSubscriber);
         this.notificationHandle = notificationHandle;
     }
 
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
index fbb359e..1ba5cd9 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
@@ -28,6 +28,7 @@ import org.apache.plc4x.java.ads.api.generic.types.AmsNetId;
 import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
 import org.apache.plc4x.java.ads.api.generic.types.Invoke;
 import org.apache.plc4x.java.ads.model.AdsDataType;
+import org.apache.plc4x.java.ads.model.AdsField;
 import org.apache.plc4x.java.ads.model.DirectAdsField;
 import org.apache.plc4x.java.ads.model.SymbolicAdsField;
 import org.apache.plc4x.java.ads.protocol.exception.AdsException;
@@ -35,7 +36,6 @@ import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.exceptions.PlcIoException;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
 import org.apache.plc4x.java.api.exceptions.PlcProtocolPayloadTooBigException;
-import org.apache.plc4x.java.base.messages.PlcProprietaryRequest;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
@@ -292,8 +292,7 @@ public class Plc4x2AdsProtocol extends MessageToMessageCodec<AmsPacket, PlcReque
         InternalPlcReadRequest plcReadRequest = (InternalPlcReadRequest) requestContainer.getRequest();
 
         // TODO: only single requests supported for now
-        DirectAdsField field = (DirectAdsField) plcReadRequest.getFields().get(0);
-
+        AdsField field = (AdsField) plcReadRequest.getFields().get(0);
 
         PlcResponseCode responseCode = decodeResponseCode(responseMessage.getResult());
         byte[] bytes = responseMessage.getData().getBytes();
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
index 491f606..f65d189 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java
@@ -21,9 +21,13 @@ package org.apache.plc4x.java.ads;
 import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 public class ManualPlc4XAdsTest {
 
@@ -34,34 +38,48 @@ public class ManualPlc4XAdsTest {
             connectionUrl = "ads:serial:///dev/ttys003/10.10.64.40.1.1:851/10.10.56.23.1.1:30000";
         } else {
             System.out.println("Using tcp");
-            connectionUrl = "ads:tcp://10.10.64.40/10.10.64.40.1.1:851/192.168.113.1.1.1:30000";
+            connectionUrl = "ads:tcp://10.10.64.40/10.10.64.40.1.1:851/10.10.56.23.1.1:30000";
         }
+        // TODO: temporary workaround
+        Thread.currentThread().setUncaughtExceptionHandler((t, e) -> {
+            System.err.println(t + " - " + e.getMessage());
+            e.printStackTrace(System.err);
+            System.exit(1);
+        });
         try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionUrl)) {
             System.out.println("PlcConnection " + plcConnection);
 
-            PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("station", "Allgemein_S2.Station:BYTE").build();
+            PlcReadRequest.Builder readRequestBuilder = plcConnection.readRequestBuilder().orElseThrow(RuntimeException::new);
+            PlcReadRequest readRequest = readRequestBuilder.addItem("station", "Allgemein_S2.Station:BYTE").build();
             CompletableFuture<? extends PlcReadResponse> response = readRequest.execute();
             PlcReadResponse readResponse = response.get();
             System.out.println("Response " + readResponse);
             Collection<Integer> stations = readResponse.getAllIntegers("station");
             stations.forEach(System.out::println);
 
-            PlcSubscriptionRequest subscriptionRequest = plcConnection.subscriptionRequestBuilder().get().addChangeOfStateField("stationChange", "Allgemein_S2.Station:BYTE").build();
-            CompletableFuture<? extends PlcSubscriptionResponse> subscribeResponse = subscriptionRequest.execute();
-            PlcSubscriptionResponse plcSubscriptionResponse = subscribeResponse.get();
+            // 2. We build a subscription
+            PlcSubscriptionRequest.Builder subscriptionRequestBuilder = plcConnection.subscriptionRequestBuilder().orElseThrow(RuntimeException::new);
+            PlcSubscriptionRequest subscriptionRequest = subscriptionRequestBuilder.addChangeOfStateField("stationChange", "Allgemein_S2.Station:BYTE").build();
+            PlcSubscriptionResponse plcSubscriptionResponse = subscriptionRequest.execute().get();
 
-            // TODO: figure out what to do with this
-            /*PlcConsumerRegistration plcConsumerRegistration = plcSubscriber.register(System.out::println, plcSubscriptionResponse.getSubscriptionHandles());
+            List<PlcConsumerRegistration> plcConsumerRegistrations = plcSubscriptionResponse.getSubscriptionHandles().stream()
+                .map(plcSubscriptionHandle -> plcSubscriptionHandle.register(System.out::println))
+                .collect(Collectors.toList());
 
+            // Now we wait a bit
             TimeUnit.SECONDS.sleep(5);
 
-            plcSubscriber.unregister(plcConsumerRegistration);
-            PlcUnsubscriptionRequest unsubscriptionRequest = plcConnection.unsubscriptionRequestBuilder().get().addHandles(plcSubscriptionResponse.getSubscriptionHandles()).build();
-            CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionResponse = plcSubscriber.unsubscribe(unsubscriptionRequest);
+            // we unregister the listener
+            plcConsumerRegistrations.forEach(PlcConsumerRegistration::unregister);
+
+            // we unsubscribe at the plc
+            PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder = plcConnection.unsubscriptionRequestBuilder().orElseThrow(RuntimeException::new);
+            PlcUnsubscriptionRequest unsubscriptionRequest = unsubscriptionRequestBuilder.addHandles(plcSubscriptionResponse.getSubscriptionHandles()).build();
+            CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionResponse = unsubscriptionRequest.execute();
 
             unsubscriptionResponse
                 .get(5, TimeUnit.SECONDS);
-            System.out.println(unsubscriptionResponse);*/
+            System.out.println(unsubscriptionResponse);
         }
         System.exit(0);
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
index 815b46e..fd0f595 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
@@ -18,27 +18,28 @@ under the License.
 */
 package org.apache.plc4x.java.base.messages;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.types.PlcSubscriptionType;
 import org.apache.plc4x.java.base.connection.PlcFieldHandler;
-import org.apache.plc4x.java.base.messages.items.FieldItem;
+import org.apache.plc4x.java.base.model.SubscriptionPlcField;
 
 import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.BiFunction;
+import java.util.stream.Collectors;
 
-// TODO: request broken needs finishing.
 public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionRequest, InternalPlcFieldRequest {
 
     private final PlcSubscriber subscriber;
 
-    public DefaultPlcSubscriptionRequest(PlcSubscriber subscriber) {
+    private LinkedHashMap<String, SubscriptionPlcField> fields;
+
+    public DefaultPlcSubscriptionRequest(PlcSubscriber subscriber, LinkedHashMap<String, SubscriptionPlcField> fields) {
         this.subscriber = subscriber;
+        this.fields = fields;
     }
 
     @Override
@@ -48,32 +49,39 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
 
     @Override
     public int getNumberOfFields() {
-        throw new IllegalStateException("not available");
+        return fields.size();
     }
 
     @Override
     public LinkedHashSet<String> getFieldNames() {
-        throw new IllegalStateException("not available");
+        return new LinkedHashSet<>(fields.keySet());
     }
 
     @Override
     public PlcField getField(String name) {
-        throw new IllegalStateException("not available");
+        SubscriptionPlcField subscriptionPlcField = fields.get(name);
+        if (subscriptionPlcField == null) {
+            return null;
+        }
+        return subscriptionPlcField.getPlcField();
     }
 
     @Override
     public LinkedList<PlcField> getFields() {
-        throw new IllegalStateException("not available");
+        return fields.values().stream().map(SubscriptionPlcField::getPlcField).collect(Collectors.toCollection(LinkedList::new));
     }
 
     @Override
-    public PlcSubscriptionType getPlcSubscriptionType() {
-        throw new IllegalStateException("not available");
+    public LinkedList<SubscriptionPlcField> getSubscriptionFields() {
+        return new LinkedList<>(fields.values());
     }
 
     @Override
     public LinkedList<Pair<String, PlcField>> getNamedFields() {
-        throw new IllegalStateException("not available");
+        return fields.entrySet()
+            .stream()
+            .map(stringPlcFieldEntry -> Pair.of(stringPlcFieldEntry.getKey(), (PlcField) stringPlcFieldEntry.getValue()))
+            .collect(Collectors.toCollection(LinkedList::new));
     }
 
     public static class Builder implements PlcSubscriptionRequest.Builder {
@@ -90,41 +98,48 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
 
         @Override
         public PlcSubscriptionRequest.Builder addCyclicField(String name, String fieldQuery, Duration pollingInterval) {
-            return null;
+            fields.put(name, new BuilderItem<>(fieldQuery, PlcSubscriptionType.CYCLIC, pollingInterval));
+            return this;
         }
 
         @Override
         public PlcSubscriptionRequest.Builder addChangeOfStateField(String name, String fieldQuery) {
-            return null;
+            fields.put(name, new BuilderItem<>(fieldQuery, PlcSubscriptionType.CHANGE_OF_STATE));
+            return this;
         }
 
         @Override
         public PlcSubscriptionRequest.Builder addEventField(String name, String fieldQuery) {
-            return null;
+            fields.put(name, new BuilderItem<>(fieldQuery, PlcSubscriptionType.EVENT));
+            return this;
         }
 
         @Override
         public PlcSubscriptionRequest build() {
-            LinkedHashMap<String, Pair<PlcField, FieldItem>> parsedFields = new LinkedHashMap<>();
+            LinkedHashMap<String, SubscriptionPlcField> parsedFields = new LinkedHashMap<>();
+
             fields.forEach((name, builderItem) -> {
-                // Compile the query string.
                 PlcField parsedField = fieldHandler.createField(builderItem.fieldQuery);
-                // Encode the payload.
-                // TODO: Depending on the field type, handle the FieldItem creation differently.
-                FieldItem fieldItem = builderItem.encoder.apply(parsedField, null);
-                parsedFields.put(name, new ImmutablePair<>(parsedField, fieldItem));
+                parsedFields.put(name, new SubscriptionPlcField(builderItem.plcSubscriptionType, parsedField, builderItem.duration));
             });
-            return new DefaultPlcSubscriptionRequest(subscriber);
+            return new DefaultPlcSubscriptionRequest(subscriber, parsedFields);
         }
 
         private static class BuilderItem<T> {
             private final String fieldQuery;
-            private final BiFunction<PlcField, T[], FieldItem> encoder;
+            private final PlcSubscriptionType plcSubscriptionType;
+            private final Duration duration;
 
-            private BuilderItem(String fieldQuery, BiFunction<PlcField, T[], FieldItem> encoder) {
+            private BuilderItem(String fieldQuery, PlcSubscriptionType plcSubscriptionType) {
+                this(fieldQuery, plcSubscriptionType, null);
+            }
+
+            private BuilderItem(String fieldQuery, PlcSubscriptionType plcSubscriptionType, Duration duration) {
                 this.fieldQuery = fieldQuery;
-                this.encoder = encoder;
+                this.plcSubscriptionType = plcSubscriptionType;
+                this.duration = duration;
             }
+
         }
 
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
index 3e13d57..4f162d4 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
@@ -18,19 +18,19 @@
  */
 package org.apache.plc4x.java.base.messages;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
 import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
-import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
-// TODO: request broken needs finishing.
-public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptionRequest, InternalPlcFieldRequest {
+public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptionRequest, InternalPlcRequest {
 
     private final PlcSubscriber subscriber;
 
@@ -47,35 +47,10 @@ public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptio
     }
 
     @Override
-    public int getNumberOfFields() {
-        throw new IllegalStateException("not available");
-    }
-
-    @Override
-    public LinkedHashSet<String> getFieldNames() {
-        throw new IllegalStateException("not available");
-    }
-
-    @Override
-    public PlcField getField(String name) {
-        throw new IllegalStateException("not available");
-    }
-
-    @Override
-    public LinkedList<PlcField> getFields() {
-        throw new IllegalStateException("not available");
-    }
-
-    @Override
     public Collection<? extends InternalPlcSubscriptionHandle> getInternalPlcSubscriptionHandles() {
         return internalPlcSubscriptionHandles;
     }
 
-    @Override
-    public LinkedList<Pair<String, PlcField>> getNamedFields() {
-        throw new IllegalStateException("not available");
-    }
-
     public static class Builder implements PlcUnsubscriptionRequest.Builder {
 
         private final PlcSubscriber subscriber;
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
index 2207e75..e3f855a 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionResponse.java
@@ -19,29 +19,10 @@
 package org.apache.plc4x.java.base.messages;
 
 import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
-import org.apache.plc4x.java.api.model.PlcField;
-import org.apache.plc4x.java.api.types.PlcResponseCode;
-
-import java.util.Collection;
 
 public class DefaultPlcUnsubscriptionResponse implements InternalPlcUnsubscriptionResponse {
 
     @Override
-    public Collection<String> getFieldNames() {
-        return null;
-    }
-
-    @Override
-    public PlcField getField(String name) {
-        return null;
-    }
-
-    @Override
-    public PlcResponseCode getResponseCode(String name) {
-        return null;
-    }
-
-    @Override
     public PlcUnsubscriptionRequest getRequest() {
         return null;
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
index 3d4ca98..608691a 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
@@ -19,10 +19,11 @@
 package org.apache.plc4x.java.base.messages;
 
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
-import org.apache.plc4x.java.api.types.PlcSubscriptionType;
+import org.apache.plc4x.java.base.model.SubscriptionPlcField;
 
-public interface InternalPlcSubscriptionRequest extends PlcSubscriptionRequest, InternalPlcFieldRequest {
+import java.util.LinkedList;
 
-    PlcSubscriptionType getPlcSubscriptionType();
+public interface InternalPlcSubscriptionRequest extends PlcSubscriptionRequest, InternalPlcFieldRequest {
 
+    LinkedList<SubscriptionPlcField> getSubscriptionFields();
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionRequest.java
index 7a8c3db..fd5b3ca 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcUnsubscriptionRequest.java
@@ -23,7 +23,7 @@ import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
 
 import java.util.Collection;
 
-public interface InternalPlcUnsubscriptionRequest extends PlcUnsubscriptionRequest, InternalPlcFieldRequest {
+public interface InternalPlcUnsubscriptionRequest extends PlcUnsubscriptionRequest, InternalPlcRequest {
 
     Collection<? extends InternalPlcSubscriptionHandle> getInternalPlcSubscriptionHandles();
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcSubscriber.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcSubscriber.java
index 577a774..8d7eedb 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcSubscriber.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcSubscriber.java
@@ -22,10 +22,8 @@ import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
 import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
 
 /**
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java
index 28583b3..55b1cf9 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java
@@ -19,6 +19,7 @@
 package org.apache.plc4x.java.base.model;
 
 import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.base.messages.PlcSubscriber;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -27,12 +28,16 @@ import java.util.function.Consumer;
 
 public class DefaultPlcConsumerRegistration implements InternalPlcConsumerRegistration {
 
+    private final PlcSubscriber plcSubscriber;
+
     private final Collection<? extends InternalPlcSubscriptionHandle> handles;
+
     private final int consumerHash;
 
-    public DefaultPlcConsumerRegistration(Consumer<PlcSubscriptionEvent> consumer, InternalPlcSubscriptionHandle... handles) {
-        consumerHash = Objects.requireNonNull(consumer).hashCode();
+    public DefaultPlcConsumerRegistration(PlcSubscriber plcSubscriber, Consumer<PlcSubscriptionEvent> consumer, InternalPlcSubscriptionHandle... handles) {
+        this.plcSubscriber = plcSubscriber;
         this.handles = Arrays.asList(Objects.requireNonNull(handles));
+        this.consumerHash = Objects.requireNonNull(consumer).hashCode();
     }
 
     @Override
@@ -70,4 +75,9 @@ public class DefaultPlcConsumerRegistration implements InternalPlcConsumerRegist
             ", consumerHash=" + consumerHash +
             '}';
     }
+
+    @Override
+    public void unregister() {
+        plcSubscriber.unregister(this);
+    }
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcSubscriptionHandle.java
similarity index 52%
copy from plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java
copy to plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcSubscriptionHandle.java
index 28583b3..3eb685b 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcConsumerRegistration.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/DefaultPlcSubscriptionHandle.java
@@ -16,33 +16,28 @@
  specific language governing permissions and limitations
  under the License.
  */
+
 package org.apache.plc4x.java.base.model;
 
 import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
+import org.apache.plc4x.java.base.messages.PlcSubscriber;
 
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.Objects;
 import java.util.function.Consumer;
 
-public class DefaultPlcConsumerRegistration implements InternalPlcConsumerRegistration {
-
-    private final Collection<? extends InternalPlcSubscriptionHandle> handles;
-    private final int consumerHash;
+public class DefaultPlcSubscriptionHandle implements InternalPlcSubscriptionHandle {
 
-    public DefaultPlcConsumerRegistration(Consumer<PlcSubscriptionEvent> consumer, InternalPlcSubscriptionHandle... handles) {
-        consumerHash = Objects.requireNonNull(consumer).hashCode();
-        this.handles = Arrays.asList(Objects.requireNonNull(handles));
-    }
+    private final PlcSubscriber plcSubscriber;
 
-    @Override
-    public int getConsumerHash() {
-        return consumerHash;
+    public DefaultPlcSubscriptionHandle(PlcSubscriber plcSubscriber) {
+        this.plcSubscriber = plcSubscriber;
     }
 
     @Override
-    public Collection<? extends InternalPlcSubscriptionHandle> getAssociatedHandles() {
-        return handles;
+    public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer) {
+        return plcSubscriber.register(consumer, Collections.singletonList(this));
     }
 
     @Override
@@ -50,24 +45,22 @@ public class DefaultPlcConsumerRegistration implements InternalPlcConsumerRegist
         if (this == o) {
             return true;
         }
-        if (!(o instanceof DefaultPlcConsumerRegistration)) {
+        if (!(o instanceof DefaultPlcSubscriptionHandle)) {
             return false;
         }
-        DefaultPlcConsumerRegistration that = (DefaultPlcConsumerRegistration) o;
-        return consumerHash == that.consumerHash &&
-            Objects.equals(handles, that.handles);
+        DefaultPlcSubscriptionHandle that = (DefaultPlcSubscriptionHandle) o;
+        return Objects.equals(plcSubscriber, that.plcSubscriber);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(handles, consumerHash);
+        return Objects.hash(plcSubscriber);
     }
 
     @Override
     public String toString() {
-        return "DefaultPlcConsumerRegistration{" +
-            "handles=" + handles +
-            ", consumerHash=" + consumerHash +
+        return "DefaultPlcSubscriptionHandle{" +
+            "plcSubscriber=" + plcSubscriber +
             '}';
     }
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/SubscriptionPlcField.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/SubscriptionPlcField.java
new file mode 100644
index 0000000..e68e4c8
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/model/SubscriptionPlcField.java
@@ -0,0 +1,55 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.base.model;
+
+import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.types.PlcSubscriptionType;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * special {@link PlcField} which adds a {@link PlcSubscriptionType}.
+ */
+public class SubscriptionPlcField {
+
+    private final PlcSubscriptionType plcSubscriptionType;
+
+    private final PlcField plcField;
+
+    private final Duration duration;
+
+    public SubscriptionPlcField(PlcSubscriptionType plcSubscriptionType, PlcField plcField, Duration duration) {
+        this.plcSubscriptionType = plcSubscriptionType;
+        this.plcField = plcField;
+        this.duration = duration;
+    }
+
+    public PlcSubscriptionType getPlcSubscriptionType() {
+        return plcSubscriptionType;
+    }
+
+    public PlcField getPlcField() {
+        return plcField;
+    }
+
+    public Optional<Duration> getDuration() {
+        return Optional.ofNullable(duration);
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index 5a9ac98..b3c6458 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -25,15 +25,13 @@ import io.netty.channel.PendingWriteQueue;
 import io.netty.util.HashedWheelTimer;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.plc4x.java.base.messages.PlcReader;
-import org.apache.plc4x.java.base.messages.PlcSubscriber;
-import org.apache.plc4x.java.base.messages.PlcWriter;
 import org.apache.plc4x.java.api.messages.PlcFieldRequest;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.base.messages.*;
 import org.apache.plc4x.java.base.messages.items.FieldItem;
 import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
+import org.apache.plc4x.java.base.model.SubscriptionPlcField;
 import org.assertj.core.api.WithAssertions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -434,13 +432,13 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
 
     private static class TestDefaultPlcSubscriptionRequest extends DefaultPlcSubscriptionRequest {
 
-        private TestDefaultPlcSubscriptionRequest(PlcSubscriber subscriber) {
-            super(subscriber);
+        private TestDefaultPlcSubscriptionRequest(PlcSubscriber subscriber, LinkedHashMap<String, SubscriptionPlcField> fields) {
+            super(subscriber, fields);
         }
 
         private static TestDefaultPlcSubscriptionRequest build(PlcSubscriber subscriber) {
             // TODO: implement me once available
-            return new TestDefaultPlcSubscriptionRequest(subscriber);
+            return new TestDefaultPlcSubscriptionRequest(subscriber, new LinkedHashMap<>());
         }
     }