You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2021/07/05 08:55:24 UTC

[plc4x] branch feature/profinet-chris updated: plc4j: fixed timing issue with SimulatedConnectionTest

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/profinet-chris
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/feature/profinet-chris by this push:
     new a9e804e  plc4j: fixed timing issue with SimulatedConnectionTest
a9e804e is described below

commit a9e804e3d533be148aed4547c15794143fb79d30
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jul 5 10:55:11 2021 +0200

    plc4j: fixed timing issue with SimulatedConnectionTest
    
    - root cause: `TimeUnit.NANOSECONDS.sleep((long) random.nextInt() * 10)` can create possible a very long sleep
---
 .../simulated/connection/SimulatedConnection.java  | 75 ++++++++++------------
 .../java/simulated/connection/SimulatedDevice.java | 41 +++++++++---
 .../plc4x/java/simulated/field/SimulatedField.java |  4 +-
 .../java/simulated/types/SimulatedFieldType.java   |  2 -
 .../plc4x/java/simulated/utils/StaticHelper.java   |  3 +-
 .../connection/SimulatedConnectionTest.java        | 19 +++++-
 .../messages/DefaultPlcSubscriptionRequest.java    |  9 ++-
 .../spi/model/DefaultPlcSubscriptionField.java     |  9 +++
 .../spi/model/DefaultPlcSubscriptionHandle.java    |  5 +-
 9 files changed, 109 insertions(+), 58 deletions(-)

diff --git a/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/connection/SimulatedConnection.java b/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/connection/SimulatedConnection.java
index 5e60169..26a8379 100644
--- a/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/connection/SimulatedConnection.java
+++ b/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/connection/SimulatedConnection.java
@@ -50,7 +50,10 @@ import org.apache.plc4x.java.spi.messages.PlcWriter;
 import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
 import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
 import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
+import org.apache.plc4x.java.spi.optimizer.SingleFieldOptimizer;
 import org.apache.plc4x.java.spi.values.IEC61131ValueHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Instant;
 import java.util.Collection;
@@ -69,15 +72,18 @@ import java.util.function.Consumer;
  */
 public class SimulatedConnection extends AbstractPlcConnection implements PlcReader, PlcWriter, PlcSubscriber {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(SimulatedConnection.class);
+
     private final SimulatedDevice device;
 
     private boolean connected = false;
 
-    private Map<PlcSubscriptionHandle, PlcConsumerRegistration> registrations = new ConcurrentHashMap<>();
+    private final Map<PlcSubscriptionHandle, PlcConsumerRegistration> registrations = new ConcurrentHashMap<>();
 
-    private Map<Integer, Consumer<PlcSubscriptionEvent>> consumerIdMap = new ConcurrentHashMap<>();
+    private final Map<Integer, Consumer<PlcSubscriptionEvent>> consumerIdMap = new ConcurrentHashMap<>();
 
     public SimulatedConnection(SimulatedDevice device) {
+        super(true, true, true, new SimulatedFieldHandler(), new IEC61131ValueHandler(), null);
         this.device = device;
     }
 
@@ -97,41 +103,6 @@ public class SimulatedConnection extends AbstractPlcConnection implements PlcRea
     }
 
     @Override
-    public boolean canRead() {
-        return true;
-    }
-
-    @Override
-    public boolean canWrite() {
-        return true;
-    }
-
-    @Override
-    public boolean canSubscribe() {
-        return true;
-    }
-
-    @Override
-    public PlcReadRequest.Builder readRequestBuilder() {
-        return new DefaultPlcReadRequest.Builder(this, new SimulatedFieldHandler());
-    }
-
-    @Override
-    public PlcWriteRequest.Builder writeRequestBuilder() {
-        return new DefaultPlcWriteRequest.Builder(this, new SimulatedFieldHandler(), new IEC61131ValueHandler());
-    }
-
-    @Override
-    public PlcSubscriptionRequest.Builder subscriptionRequestBuilder() {
-        return new DefaultPlcSubscriptionRequest.Builder(this, new SimulatedFieldHandler());
-    }
-
-    @Override
-    public PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder() {
-        return new DefaultPlcUnsubscriptionRequest.Builder(this);
-    }
-
-    @Override
     public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
         Map<String, ResponseItem<PlcValue>> fields = new HashMap<>();
         for (String fieldName : readRequest.getFieldNames()) {
@@ -166,20 +137,31 @@ public class SimulatedConnection extends AbstractPlcConnection implements PlcRea
         return String.format("simulated:%s", device);
     }
 
+    /**
+     * Blocking subscribe call
+     *
+     * @param subscriptionRequest subscription request containing at least one subscription request item.
+     * @return the {@code PlcSubscriptionResponse}
+     */
     @Override
     public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
+        LOGGER.info("subscribing {}", subscriptionRequest);
         Map<String, ResponseItem<PlcSubscriptionHandle>> values = new HashMap<>();
         subscriptionRequest.getFieldNames().forEach(name -> {
+            LOGGER.info("creating handle for field name {}", name);
             PlcSubscriptionHandle handle = new DefaultPlcSubscriptionHandle(this);
             final PlcSubscriptionField subscriptionPlcField = subscriptionRequest.getField(name);
             switch (subscriptionPlcField.getPlcSubscriptionType()) {
                 case CYCLIC:
+                    LOGGER.info("Adding cyclic subscription for field name {}", name);
                     device.addCyclicSubscription(dispatchSubscriptionEvent(name, handle), handle, subscriptionPlcField, subscriptionPlcField.getDuration().orElseThrow(RuntimeException::new));
                     break;
                 case CHANGE_OF_STATE:
+                    LOGGER.info("Adding change of state subscription for field name {}", name);
                     device.addChangeOfStateSubscription(dispatchSubscriptionEvent(name, handle), handle, subscriptionPlcField);
                     break;
                 case EVENT:
+                    LOGGER.info("Adding event subscription for field name {}", name);
                     device.addEventSubscription(dispatchSubscriptionEvent(name, handle), handle, subscriptionPlcField);
                     break;
             }
@@ -192,22 +174,30 @@ public class SimulatedConnection extends AbstractPlcConnection implements PlcRea
 
     private Consumer<PlcValue> dispatchSubscriptionEvent(String name, PlcSubscriptionHandle handle) {
         return plcValue -> {
+            LOGGER.info("handling plc value {}", plcValue);
             PlcConsumerRegistration plcConsumerRegistration = registrations.get(handle);
             if (plcConsumerRegistration == null) {
+                LOGGER.warn("no registration for handle {}", handle);
                 return;
             }
-            int consumerId = ((DefaultPlcConsumerRegistration) plcConsumerRegistration).getConsumerId();
+            int consumerId = plcConsumerRegistration.getConsumerId();
             Consumer<PlcSubscriptionEvent> consumer = consumerIdMap.get(consumerId);
             if (consumer == null) {
+                LOGGER.warn("no consumer for id {}", consumerId);
                 return;
             }
-            consumer.accept(new DefaultPlcSubscriptionEvent(Instant.now(),
-                Collections.singletonMap(name, new ResponseItem<>(PlcResponseCode.OK, plcValue))));
+            consumer.accept(
+                new DefaultPlcSubscriptionEvent(
+                    Instant.now(),
+                    Collections.singletonMap(name, new ResponseItem<>(PlcResponseCode.OK, plcValue))
+                )
+            );
         };
     }
 
     @Override
     public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
+        LOGGER.info("unsubscribing {}", unsubscriptionRequest);
         device.removeHandles(unsubscriptionRequest.getSubscriptionHandles());
 
         PlcUnsubscriptionResponse response = new DefaultPlcUnsubscriptionResponse(unsubscriptionRequest);
@@ -216,6 +206,7 @@ public class SimulatedConnection extends AbstractPlcConnection implements PlcRea
 
     @Override
     public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
+        LOGGER.info("Registering consumer {} with handles {}", consumer, handles);
         PlcConsumerRegistration plcConsumerRegistration = new DefaultPlcConsumerRegistration(this, consumer, handles.toArray(new PlcSubscriptionHandle[0]));
         handles.stream()
             .map(PlcSubscriptionHandle.class::cast)
@@ -226,15 +217,19 @@ public class SimulatedConnection extends AbstractPlcConnection implements PlcRea
 
     @Override
     public void unregister(PlcConsumerRegistration registration) {
+        LOGGER.info("Unregistering {}", registration);
         Iterator<Map.Entry<PlcSubscriptionHandle, PlcConsumerRegistration>> entryIterator = registrations.entrySet().iterator();
         while (entryIterator.hasNext()) {
             Map.Entry<PlcSubscriptionHandle, PlcConsumerRegistration> entry = entryIterator.next();
             if (!entry.getValue().equals(registration)) {
+                LOGGER.debug("not the value we looking for {}. We are looking for {}", entry.getValue(), registration);
                 continue;
             }
             PlcConsumerRegistration consumerRegistration = entry.getValue();
             int consumerId = consumerRegistration.getConsumerId();
+            LOGGER.info("Removing consumer {}", consumerId);
             consumerIdMap.remove(consumerId);
+            LOGGER.info("Removing handles {}", consumerRegistration.getSubscriptionHandles());
             device.removeHandles(consumerRegistration.getSubscriptionHandles());
             entryIterator.remove();
         }
diff --git a/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/connection/SimulatedDevice.java b/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/connection/SimulatedDevice.java
index 698fea9..6df31cf 100644
--- a/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/connection/SimulatedDevice.java
+++ b/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/connection/SimulatedDevice.java
@@ -19,6 +19,7 @@
 package org.apache.plc4x.java.simulated.connection;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.model.PlcSubscriptionField;
 import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.api.value.*;
@@ -45,7 +46,7 @@ import java.util.function.Consumer;
  */
 public class SimulatedDevice {
 
-    private static final Logger logger = LoggerFactory.getLogger(SimulatedDevice.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(SimulatedDevice.class);
 
     private final Random random = new SecureRandom();
 
@@ -68,6 +69,7 @@ public class SimulatedDevice {
     }
 
     public Optional<PlcValue> get(SimulatedField field) {
+        LOGGER.debug("getting field {}", field);
         Objects.requireNonNull(field);
         switch (field.getType()) {
             case STATE:
@@ -81,17 +83,21 @@ public class SimulatedDevice {
     }
 
     public void set(SimulatedField field, PlcValue value) {
+        LOGGER.debug("setting field {} to {}", field, value);
         Objects.requireNonNull(field);
         switch (field.getType()) {
             case STATE:
                 changeOfStateSubscriptions.values().stream()
                     .filter(pair -> pair.getKey().equals(field))
                     .map(Pair::getValue)
+                    .peek(plcValueConsumer -> {
+                        LOGGER.debug("{} is getting notified with {}", plcValueConsumer, value);
+                    })
                     .forEach(baseDefaultPlcValueConsumer -> baseDefaultPlcValueConsumer.accept(value));
                 state.put(field, value);
                 return;
             case STDOUT:
-                logger.info("TEST PLC STDOUT [{}]: {}", field.getName(), value.getString());
+                LOGGER.info("TEST PLC STDOUT [{}]: {}", field.getName(), value.getString());
                 return;
             case RANDOM:
                 switch (field.getPlcDataType()) {
@@ -102,16 +108,15 @@ public class SimulatedDevice {
                         try {
                             DataItemIO.staticSerialize(value, field.getPlcDataType(), field.getNumberOfElements(), false);
                         } catch (ParseException e) {
-                            logger.info("Write failed");
+                            LOGGER.info("Write failed");
                         }
                 }
-                logger.info("TEST PLC RANDOM [{}]: {}", field.getName(), value);
+                LOGGER.info("TEST PLC RANDOM [{}]: {}", field.getName(), value);
                 return;
         }
         throw new IllegalArgumentException("Unsupported field type: " + field.getType().name());
     }
 
-    @SuppressWarnings("unchecked")
     private PlcValue randomValue(SimulatedField field) {
         short fieldDataTypeSize = field.getDataType().getDataTypeSize();
 
@@ -124,7 +129,6 @@ public class SimulatedDevice {
         } catch (ParseException e) {
             return null;
         }
-
     }
 
     @Override
@@ -133,8 +137,12 @@ public class SimulatedDevice {
     }
 
     public void addCyclicSubscription(Consumer<PlcValue> consumer, PlcSubscriptionHandle handle, PlcSubscriptionField plcField, Duration duration) {
+        LOGGER.debug("Adding cyclic subscription: {}, {}, {}, {}", consumer, handle, plcField, duration);
+        assert plcField instanceof DefaultPlcSubscriptionField;
         ScheduledFuture<?> scheduledFuture = scheduler.scheduleAtFixedRate(() -> {
-            PlcValue baseDefaultPlcValue = state.get(((DefaultPlcSubscriptionField) plcField).getPlcField());
+            PlcField innerPlcField = ((DefaultPlcSubscriptionField) plcField).getPlcField();
+            assert innerPlcField instanceof SimulatedField;
+            PlcValue baseDefaultPlcValue = state.get(innerPlcField);
             if (baseDefaultPlcValue == null) {
                 return;
             }
@@ -144,21 +152,33 @@ public class SimulatedDevice {
     }
 
     public void addChangeOfStateSubscription(Consumer<PlcValue> consumer, PlcSubscriptionHandle handle, PlcSubscriptionField plcField) {
+        LOGGER.debug("Adding change of state subscription: {}, {}, {}", consumer, handle, plcField);
         changeOfStateSubscriptions.put(handle, Pair.of((SimulatedField) ((DefaultPlcSubscriptionField) plcField).getPlcField(), consumer));
     }
 
     public void addEventSubscription(Consumer<PlcValue> consumer, PlcSubscriptionHandle handle, PlcSubscriptionField plcField) {
+        LOGGER.debug("Adding event subscription: {}, {}, {}", consumer, handle, plcField);
+        assert plcField instanceof DefaultPlcSubscriptionField;
         Future<?> submit = pool.submit(() -> {
+            LOGGER.debug("WORKER: starting for {}, {}, {}", consumer, handle, plcField);
             while (!Thread.currentThread().isInterrupted()) {
-                PlcValue baseDefaultPlcValue = state.get(((DefaultPlcSubscriptionField) plcField).getPlcField());
+                LOGGER.debug("WORKER: running for {}, {}, {}", consumer, handle, plcField);
+                PlcField innerPlcField = ((DefaultPlcSubscriptionField) plcField).getPlcField();
+                assert innerPlcField instanceof SimulatedField;
+                PlcValue baseDefaultPlcValue = state.get(innerPlcField);
                 if (baseDefaultPlcValue == null) {
+                    LOGGER.debug("WORKER: no value for {}, {}, {}", consumer, handle, plcField);
                     continue;
                 }
+                LOGGER.debug("WORKER: accepting {} for {}, {}, {}", baseDefaultPlcValue, consumer, handle, plcField);
                 consumer.accept(baseDefaultPlcValue);
                 try {
-                    TimeUnit.NANOSECONDS.sleep((long)random.nextInt() * 10);
+                    long sleepTime = Math.min(random.nextInt((int) TimeUnit.SECONDS.toNanos(5)), TimeUnit.MILLISECONDS.toNanos(500));
+                    LOGGER.debug("WORKER: sleeping {} milliseconds for {}, {}, {}", TimeUnit.NANOSECONDS.toMillis(sleepTime), consumer, handle, plcField);
+                    TimeUnit.NANOSECONDS.sleep(sleepTime);
                 } catch (InterruptedException ignore) {
                     Thread.currentThread().interrupt();
+                    LOGGER.debug("WORKER: got interrupted for {}, {}, {}", consumer, handle, plcField);
                     return;
                 }
             }
@@ -168,9 +188,11 @@ public class SimulatedDevice {
     }
 
     public void removeHandles(Collection<? extends PlcSubscriptionHandle> internalPlcSubscriptionHandles) {
+        LOGGER.debug("remove handles {}", internalPlcSubscriptionHandles);
         internalPlcSubscriptionHandles.forEach(handle -> {
             ScheduledFuture<?> remove = cyclicSubscriptions.remove(handle);
             if (remove == null) {
+                LOGGER.debug("nothing to cancel {}", handle);
                 return;
             }
             remove.cancel(true);
@@ -178,6 +200,7 @@ public class SimulatedDevice {
         internalPlcSubscriptionHandles.forEach(handle -> {
             Future<?> remove = eventSubscriptions.remove(handle);
             if (remove == null) {
+                LOGGER.debug("nothing to cancel {}", handle);
                 return;
             }
             remove.cancel(true);
diff --git a/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/field/SimulatedField.java b/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/field/SimulatedField.java
index 9740b8d..a529ee5 100644
--- a/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/field/SimulatedField.java
+++ b/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/field/SimulatedField.java
@@ -36,14 +36,12 @@ import java.util.regex.Pattern;
  */
 public class SimulatedField implements PlcField {
 
-    private static final Logger logger = LoggerFactory.getLogger(SimulatedField.class);
-
     /**
      * Examples:
      * - {@code RANDOM/foo:INTEGER}
      * - {@code STDOUT/foo:STRING}
      */
-    private static final Pattern ADDRESS_PATTERN = Pattern.compile("^(?<type>\\w+)/(?<name>[a-zA-Z0-9_\\.]+):(?<dataType>[a-zA-Z0-9]++)(\\[(?<numElements>\\d+)])?$");
+    private static final Pattern ADDRESS_PATTERN = Pattern.compile("^(?<type>\\w+)/(?<name>[a-zA-Z0-9_\\\\.]+):(?<dataType>[a-zA-Z0-9]++)(\\[(?<numElements>\\d+)])?$");
 
     private final SimulatedFieldType type;
     private final String name;
diff --git a/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/types/SimulatedFieldType.java b/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/types/SimulatedFieldType.java
index 47da98b..560b64e 100644
--- a/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/types/SimulatedFieldType.java
+++ b/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/types/SimulatedFieldType.java
@@ -20,9 +20,7 @@
 package org.apache.plc4x.java.simulated.types;
 
 public enum SimulatedFieldType {
-
     RANDOM,
     STATE,
     STDOUT
-
 }
diff --git a/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/utils/StaticHelper.java b/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/utils/StaticHelper.java
index f0d5770..e724dbb 100644
--- a/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/utils/StaticHelper.java
+++ b/plc4j/drivers/simulated/src/main/java/org/apache/plc4x/java/simulated/utils/StaticHelper.java
@@ -33,8 +33,9 @@ public class StaticHelper {
             // This is the maximum number of bytes a string can be long.
             short stringLength = io.readUnsignedShort(8);
             // Read the full size of the string.
-            String str = io.readString(stringLength * 8, (String) encoding);
+            String str = io.readString(stringLength * 8, encoding);
             // Cut off the parts that don't belong to it.
+            // TODO: shouldn't the above comment indicate that some code should appear here
             return str;
         } catch (ParseException e) {
             return null;
diff --git a/plc4j/drivers/simulated/src/test/java/org/apache/plc4x/java/simulated/connection/SimulatedConnectionTest.java b/plc4j/drivers/simulated/src/test/java/org/apache/plc4x/java/simulated/connection/SimulatedConnectionTest.java
index f96ecc1..50ba6ad 100644
--- a/plc4j/drivers/simulated/src/test/java/org/apache/plc4x/java/simulated/connection/SimulatedConnectionTest.java
+++ b/plc4j/drivers/simulated/src/test/java/org/apache/plc4x/java/simulated/connection/SimulatedConnectionTest.java
@@ -29,6 +29,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.Collection;
@@ -44,6 +46,8 @@ import static org.mockito.Mockito.mock;
 @ExtendWith(MockitoExtension.class)
 class SimulatedConnectionTest implements WithAssertions {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(SimulatedConnectionTest.class);
+
     SimulatedConnection SUT;
 
     @Mock
@@ -133,6 +137,7 @@ class SimulatedConnectionTest implements WithAssertions {
     class Registration {
         @Test
         void register() {
+            @SuppressWarnings("unchecked")
             PlcConsumerRegistration register = SUT.register(mock(Consumer.class), Collections.emptyList());
             assertThat(register).isNotNull();
         }
@@ -152,6 +157,7 @@ class SimulatedConnectionTest implements WithAssertions {
 
         @Test
         void subscription() throws Exception {
+            LOGGER.trace("initialize");
             // Initialize the addresses
             PlcWriteRequest plcWriteRequest = SUT.writeRequestBuilder()
                 .addItem("cyclic", "STATE/cyclic:STRING", "initialcyclic")
@@ -159,7 +165,9 @@ class SimulatedConnectionTest implements WithAssertions {
                 .addItem("event", "STATE/event:STRING", "initialevent")
                 .build();
             SUT.write(plcWriteRequest).get(1, TimeUnit.SECONDS);
+            // Note: as we don't have a subscription yet, no callback will be executed
 
+            LOGGER.trace("subscribe");
             // Subscribe for the addresses
             PlcSubscriptionRequest plcSubscriptionRequest = SUT.subscriptionRequestBuilder()
                 .addCyclicField("cyclic", "STATE/cyclic:String", Duration.ofSeconds(1))
@@ -168,6 +176,7 @@ class SimulatedConnectionTest implements WithAssertions {
                 .build();
             PlcSubscriptionResponse plcSubscriptionResponse = SUT.subscribe(plcSubscriptionRequest).get(1, TimeUnit.SECONDS);
 
+            LOGGER.trace("register handler");
             // Register some handlers for the subscriptions that simply put the messages in a queue.
             Queue<PlcSubscriptionEvent> cyclicQueue = new ConcurrentLinkedQueue<>();
             PlcConsumerRegistration cyclicRegistration = plcSubscriptionResponse.getSubscriptionHandle("cyclic").register(cyclicQueue::add);
@@ -177,9 +186,11 @@ class SimulatedConnectionTest implements WithAssertions {
             PlcConsumerRegistration eventRegistration = plcSubscriptionResponse.getSubscriptionHandle("event").register(eventQueue::add);
             assertThat(plcSubscriptionResponse.getFieldNames()).isNotEmpty();
 
+            LOGGER.trace("giving time");
             // Give the system some time to do stuff
-            TimeUnit.SECONDS.sleep(10);
+            TimeUnit.SECONDS.sleep(2);
 
+            LOGGER.trace("write some addresses");
             // Write something to the addresses in order to trigger a value-change event
             PlcWriteRequest plcWriteRequest2 = SUT.writeRequestBuilder()
                 .addItem("cyclic", "STATE/cyclic:STRING", "changedcyclic")
@@ -188,11 +199,17 @@ class SimulatedConnectionTest implements WithAssertions {
                 .build();
             SUT.write(plcWriteRequest2).get(10, TimeUnit.SECONDS);
 
+            LOGGER.trace("giving time again");
+            // Give the system some time to do stuff
+            TimeUnit.SECONDS.sleep(2);
+
+            LOGGER.trace("unregister");
             // Unregister all consumers
             cyclicRegistration.unregister();
             stateRegistration.unregister();
             eventRegistration.unregister();
 
+            LOGGER.trace("assertions");
             // The cyclic queue should not be empty as it had 10 seconds to get a value once per second
             assertThat(cyclicQueue).isNotEmpty();
             cyclicQueue.forEach(
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java
index a4ebd18..1317d45 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/messages/DefaultPlcSubscriptionRequest.java
@@ -46,7 +46,7 @@ public class DefaultPlcSubscriptionRequest implements PlcSubscriptionRequest, Se
 
     private final PlcSubscriber subscriber;
 
-    private LinkedHashMap<String, PlcSubscriptionField> fields;
+    private final LinkedHashMap<String, PlcSubscriptionField> fields;
 
     @JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
     public DefaultPlcSubscriptionRequest(@JsonProperty("subscriber") PlcSubscriber subscriber,
@@ -180,4 +180,11 @@ public class DefaultPlcSubscriptionRequest implements PlcSubscriptionRequest, Se
 
     }
 
+    @Override
+    public String toString() {
+        return "DefaultPlcSubscriptionRequest{" +
+            "subscriber=" + subscriber +
+            ", fields=" + fields +
+            '}';
+    }
 }
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/model/DefaultPlcSubscriptionField.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/model/DefaultPlcSubscriptionField.java
index ce22970..267bf56 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/model/DefaultPlcSubscriptionField.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/model/DefaultPlcSubscriptionField.java
@@ -53,4 +53,13 @@ public class DefaultPlcSubscriptionField implements PlcSubscriptionField {
     public Optional<Duration> getDuration() {
         return Optional.ofNullable(duration);
     }
+
+    @Override
+    public String toString() {
+        return "DefaultPlcSubscriptionField{" +
+            "plcSubscriptionType=" + plcSubscriptionType +
+            ", plcField=" + plcField +
+            ", duration=" + duration +
+            '}';
+    }
 }
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/model/DefaultPlcSubscriptionHandle.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/model/DefaultPlcSubscriptionHandle.java
index c9d687b..2fba6b4 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/model/DefaultPlcSubscriptionHandle.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/model/DefaultPlcSubscriptionHandle.java
@@ -30,7 +30,7 @@ import java.util.function.Consumer;
 // Warning: do not override equals and hashCode as these should not include the plcSubscriber.
 public class DefaultPlcSubscriptionHandle implements PlcSubscriptionHandle {
 
-    private final PlcSubscriber plcSubscriber;
+    private final transient PlcSubscriber plcSubscriber;
 
     public DefaultPlcSubscriptionHandle(PlcSubscriber plcSubscriber) {
         this.plcSubscriber = plcSubscriber;
@@ -43,6 +43,9 @@ public class DefaultPlcSubscriptionHandle implements PlcSubscriptionHandle {
 
     @Override
     public boolean equals(Object obj) {
+        if (!(obj instanceof DefaultPlcSubscriptionHandle)) {
+            return false;
+        }
         // A handle is unique therefore we use the default implementation from Object
         return (this == obj);
     }