You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/09/27 10:15:58 UTC

[incubator-plc4x] 07/07: [General] SingleItemToSingleRequestProtocol fixed a bunch of bugs and added tests

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 5951fc5e1ecc208363f653175c922b7ca2a3b53a
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Sep 27 12:13:30 2018 +0200

    [General] SingleItemToSingleRequestProtocol fixed a bunch of bugs and added tests
---
 .../SingleItemToSingleRequestProtocol.java         | 125 ++++++---
 .../SingleItemToSingleRequestProtocolTest.java     | 290 +++++++++++++++++++++
 pom.xml                                            |  34 ++-
 3 files changed, 399 insertions(+), 50 deletions(-)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index fa578f2..724ffad 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -36,29 +36,50 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 /**
  * This layer can be used to split a {@link org.apache.plc4x.java.api.messages.PlcRequest} which addresses multiple {@link PlcField}s into multiple subsequent {@link org.apache.plc4x.java.api.messages.PlcRequest}s.
  */
-// TODO: write test
 public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
 
     private PendingWriteQueue queue;
 
-    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedRequestItems;
+    // Map to track send subcontainers
+    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedSubContainer;
 
+    // Map to map tdpu to original parent container
+    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> correlationToParentContainer;
+
+    // Map to track tdpus per container
     private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap;
 
+    // Map to track a list of responses per parent container
     private ConcurrentMap<PlcRequestContainer<?, ?>, List<InternalPlcResponse<?>>> responsesToBeDelivered;
 
     private AtomicInteger correlationId;
 
+    public SingleItemToSingleRequestProtocol() {
+        this(true);
+    }
+
+    public SingleItemToSingleRequestProtocol(boolean betterImplementationPossible) {
+        if (betterImplementationPossible) {
+            String callStack = Arrays.stream(Thread.currentThread().getStackTrace())
+                .map(StackTraceElement::toString)
+                .collect(Collectors.joining("\n"));
+            LOGGER.warn("Unoptimized Usage of {} detected at:\n{}", this.getClass(), callStack);
+            LOGGER.info("Consider implementing item splitting native to the protocol.");
+        }
+    }
+
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
         this.queue = new PendingWriteQueue(ctx);
-        this.sentButUnacknowledgedRequestItems = new ConcurrentHashMap<>();
+        this.sentButUnacknowledgedSubContainer = new ConcurrentHashMap<>();
+        this.correlationToParentContainer = new ConcurrentHashMap<>();
         this.containerCorrelationIdMap = new ConcurrentHashMap<>();
         this.responsesToBeDelivered = new ConcurrentHashMap<>();
         this.correlationId = new AtomicInteger();
@@ -68,6 +89,11 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     @Override
     public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
         this.queue.removeAndWriteAll();
+        this.sentButUnacknowledgedSubContainer.clear();
+        this.correlationToParentContainer.clear();
+        this.containerCorrelationIdMap.clear();
+        this.responsesToBeDelivered.clear();
+        this.correlationId.set(0);
         super.channelUnregistered(ctx);
     }
 
@@ -82,20 +108,22 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     // Decoding
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 
-    private void tryFinish(int correlationId, InternalPlcResponse msg) {
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId);
-        if (plcRequestContainer == null) {
+    protected void tryFinish(Integer correlationId, InternalPlcResponse msg, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId);
+        LOGGER.info("{} got acknowledged", subPlcRequestContainer);
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(correlationId);
+        if (originalPlcRequestContainer == null) {
             LOGGER.warn("Unrelated package received {}", msg);
             return;
         }
-        List<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
+        List<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(originalPlcRequestContainer, ignore -> new LinkedList<>());
         correlatedResponseItems.add(msg);
-        Set<Integer> integers = containerCorrelationIdMap.get(plcRequestContainer);
+        Set<Integer> integers = containerCorrelationIdMap.get(originalPlcRequestContainer);
         integers.remove(correlationId);
         if (integers.isEmpty()) {
             InternalPlcResponse<?> plcResponse;
-            if (plcRequestContainer.getRequest() instanceof InternalPlcReadRequest) {
-                InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) plcRequestContainer.getRequest();
+            if (originalPlcRequestContainer.getRequest() instanceof InternalPlcReadRequest) {
+                InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) originalPlcRequestContainer.getRequest();
                 HashMap<String, Pair<PlcResponseCode, FieldItem>> fields = new HashMap<>();
 
                 correlatedResponseItems.stream()
@@ -104,8 +132,8 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                     .forEach(stringPairMap -> stringPairMap.forEach(fields::put));
 
                 plcResponse = new DefaultPlcReadResponse(internalPlcReadRequest, fields);
-            } else if (plcRequestContainer.getRequest() instanceof InternalPlcWriteRequest) {
-                InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) plcRequestContainer.getRequest();
+            } else if (originalPlcRequestContainer.getRequest() instanceof InternalPlcWriteRequest) {
+                InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) originalPlcRequestContainer.getRequest();
                 HashMap<String, PlcResponseCode> values = new HashMap<>();
 
                 correlatedResponseItems.stream()
@@ -115,20 +143,19 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
                 plcResponse = new DefaultPlcWriteResponse(internalPlcWriteRequest, values);
             } else {
-                throw new PlcRuntimeException("Unknown type detected " + plcRequestContainer.getRequest());
+                throw new PlcRuntimeException("Unknown type detected " + originalPlcRequestContainer.getRequest());
             }
-            plcRequestContainer.getResponseFuture().complete(plcResponse);
-            responsesToBeDelivered.remove(plcRequestContainer);
+            responsesToBeDelivered.remove(originalPlcRequestContainer);
+            containerCorrelationIdMap.remove(originalPlcRequestContainer);
+            originalResponseFuture.complete(plcResponse);
         }
     }
 
-    private void errored(int correlationId, Throwable throwable) {
-        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId);
-        if (plcRequestContainer == null) {
-            LOGGER.warn("Unrelated error received ", throwable);
-            return;
-        }
-        plcRequestContainer.getResponseFuture().completeExceptionally(throwable);
+    protected void errored(int correlationId, Throwable throwable, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId);
+        // TODO: cleanup missing maps as the complete response gets canceled now.
+        LOGGER.warn("PlcRequestContainer {} and correlationId {} failed ", plcRequestContainer, correlationId, throwable);
+        originalResponseFuture.completeExceptionally(throwable);
     }
 
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -153,17 +180,21 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                     internalPlcReadRequest.getNamedFields().forEach(field -> {
                         ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
 
-                        int tdpu = correlationId.getAndIncrement();
-                        CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
+                        Integer tdpu = correlationId.getAndIncrement();
+                        CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>();
+                        // Important: don't chain to above as we want the above to be completed not the result of when complete
+                        correlatedCompletableFuture
                             .thenApply(InternalPlcResponse.class::cast)
                             .whenComplete((internalPlcResponse, throwable) -> {
                                 if (throwable != null) {
-                                    errored(tdpu, throwable);
+                                    errored(tdpu, throwable, in.getResponseFuture());
                                 } else {
-                                    tryFinish(tdpu, internalPlcResponse);
+                                    tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
                                 }
                             });
-                        queue.add(new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture), subPromise);
+                        PlcRequestContainer<CorrelatedPlcReadRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture);
+                        correlationToParentContainer.put(tdpu, in);
+                        queue.add(correlatedPlcRequestContainer, subPromise);
                         if (!tdpus.add(tdpu)) {
                             throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
                         }
@@ -175,17 +206,19 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                     internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> {
                         ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
 
-                        int tdpu = correlationId.getAndIncrement();
+                        Integer tdpu = correlationId.getAndIncrement();
                         CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
                             .thenApply(InternalPlcResponse.class::cast)
                             .whenComplete((internalPlcResponse, throwable) -> {
                                 if (throwable != null) {
-                                    errored(tdpu, throwable);
+                                    errored(tdpu, throwable, in.getResponseFuture());
                                 } else {
-                                    tryFinish(tdpu, internalPlcResponse);
+                                    tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
                                 }
                             });
-                        queue.add(new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture), subPromise);
+                        PlcRequestContainer<CorrelatedPlcWriteRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture);
+                        correlationToParentContainer.put(tdpu, in);
+                        queue.add(correlatedPlcRequestContainer, subPromise);
                         if (!tdpus.add(tdpu)) {
                             throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
                         }
@@ -232,30 +265,30 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
             if (request instanceof CorrelatedPlcRequest) {
                 CorrelatedPlcRequest correlatedPlcRequest = (CorrelatedPlcRequest) request;
 
-                // Add it to the list of sentButUnacknowledgedRequestItems.
-                sentButUnacknowledgedRequestItems.put(correlatedPlcRequest.getTdpu(), currentItem);
+                // Add it to the list of sentButUnacknowledgedSubContainer.
+                sentButUnacknowledgedSubContainer.put(correlatedPlcRequest.getTdpu(), currentItem);
 
-                LOGGER.debug("Item Message with id {} sent", correlatedPlcRequest.getTdpu());
+                LOGGER.debug("container with id {} sent: ", correlatedPlcRequest.getTdpu(), currentItem);
             }
         }
         ctx.flush();
     }
 
-    interface CorrelatedPlcRequest extends InternalPlcRequest {
+    protected interface CorrelatedPlcRequest extends InternalPlcRequest {
 
         int getTdpu();
     }
 
-    private static class CorrelatedPlcReadRequest extends DefaultPlcReadRequest implements CorrelatedPlcRequest {
+    protected static class CorrelatedPlcReadRequest extends DefaultPlcReadRequest implements CorrelatedPlcRequest {
 
-        private final int tdpu;
+        protected final int tdpu;
 
-        public CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) {
+        protected CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) {
             super(fields);
             this.tdpu = tdpu;
         }
 
-        public static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
+        protected static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
             LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
             fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue());
             return new CorrelatedPlcReadRequest(fields, tdpu);
@@ -267,7 +300,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
         }
     }
 
-    private static class CorrelatedPlcWriteRequest extends DefaultPlcWriteRequest implements CorrelatedPlcRequest {
+    protected static class CorrelatedPlcWriteRequest extends DefaultPlcWriteRequest implements CorrelatedPlcRequest {
 
         private final int tdpu;
 
@@ -287,4 +320,16 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
             return tdpu;
         }
     }
+
+    // TODO: maybe export to jmx
+    public Map<String, Integer> getStatistics() {
+        HashMap<String, Integer> statistics = new HashMap<>();
+        statistics.put("queue", queue.size());
+        statistics.put("sentButUnacknowledgedSubContainer", sentButUnacknowledgedSubContainer.size());
+        statistics.put("correlationToParentContainer", correlationToParentContainer.size());
+        statistics.put("containerCorrelationIdMap", containerCorrelationIdMap.size());
+        statistics.put("responsesToBeDelivered", responsesToBeDelivered.size());
+        statistics.put("currentCorrelationId", correlationId.get());
+        return statistics;
+    }
 }
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
new file mode 100644
index 0000000..9a1c4ea
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -0,0 +1,290 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+
+package org.apache.plc4x.java.base.protocol;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.messages.PlcFieldRequest;
+import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.*;
+import org.apache.plc4x.java.base.messages.items.FieldItem;
+import org.assertj.core.api.WithAssertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.*;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class SingleItemToSingleRequestProtocolTest implements WithAssertions {
+
+    @InjectMocks
+    SingleItemToSingleRequestProtocol SUT;
+
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    ChannelHandlerContext channelHandlerContext;
+
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    ChannelPromise channelPromise;
+
+    @Mock
+    CompletableFuture<InternalPlcResponse> responseCompletableFuture;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        SUT.channelRegistered(channelHandlerContext);
+        when(channelHandlerContext.executor().inEventLoop()).thenReturn(true);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        SUT.channelUnregistered(channelHandlerContext);
+    }
+
+    @Nested
+    class Misc {
+        @Test
+        void channelRegistered() throws Exception {
+            SUT.channelRegistered(channelHandlerContext);
+            assertThat(SUT.getStatistics()).containsOnly(
+                entry("queue", 0),
+                entry("sentButUnacknowledgedSubContainer", 0),
+                entry("correlationToParentContainer", 0),
+                entry("containerCorrelationIdMap", 0),
+                entry("responsesToBeDelivered", 0),
+                entry("currentCorrelationId", 0)
+            );
+        }
+
+        @Test
+        void channelUnregistered() throws Exception {
+            SUT.channelUnregistered(channelHandlerContext);
+            assertThat(SUT.getStatistics()).containsOnly(
+                entry("queue", 0),
+                entry("sentButUnacknowledgedSubContainer", 0),
+                entry("correlationToParentContainer", 0),
+                entry("containerCorrelationIdMap", 0),
+                entry("responsesToBeDelivered", 0),
+                entry("currentCorrelationId", 0)
+            );
+        }
+
+        @Test
+        void channelInactive() throws Exception {
+            SUT.channelInactive(channelHandlerContext);
+            assertThat(SUT.getStatistics()).containsOnly(
+                entry("queue", 0),
+                entry("sentButUnacknowledgedSubContainer", 0),
+                entry("correlationToParentContainer", 0),
+                entry("containerCorrelationIdMap", 0),
+                entry("responsesToBeDelivered", 0),
+                entry("currentCorrelationId", 0)
+            );
+        }
+    }
+
+    @Nested
+    class Roundtrip {
+        @Captor
+        ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor;
+
+        @Test
+        void simpleRead() throws Exception {
+            // Given
+            // we have a simple read
+            PlcRequestContainer<TestDefaultPlcReadRequest, InternalPlcResponse> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+            // When
+            // we write this
+            SUT.write(channelHandlerContext, msg, channelPromise);
+            // And
+            // and we simulate that all get responded
+            verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+            List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+            capturedDownstreamContainers.forEach(plcRequestContainer -> {
+                InternalPlcReadRequest request = (InternalPlcReadRequest) plcRequestContainer.getRequest();
+                String fieldName = request.getFieldNames().iterator().next();
+                CompletableFuture responseFuture = plcRequestContainer.getResponseFuture();
+                HashMap<String, Pair<PlcResponseCode, FieldItem>> responseFields = new HashMap<>();
+                responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, mock(FieldItem.class)));
+                responseFuture.complete(new DefaultPlcReadResponse(request, responseFields));
+            });
+            // Then
+            // our complete container should complete normally
+            verify(responseCompletableFuture).complete(any());
+            // And we should have no memory leak
+            assertThat(SUT.getStatistics()).containsOnly(
+                entry("queue", 0),
+                entry("sentButUnacknowledgedSubContainer", 0),
+                entry("correlationToParentContainer", 0),
+                entry("containerCorrelationIdMap", 0),
+                entry("responsesToBeDelivered", 0),
+                entry("currentCorrelationId", 5)
+            );
+        }
+    }
+
+    @Nested
+    class Decoding {
+        @Test
+        void tryFinish() throws Exception {
+            SUT.tryFinish(1, null, new CompletableFuture<>());
+            // TODO: add Assertions.
+        }
+
+        @Test
+        void errored() throws Exception {
+            SUT.errored(1, mock(Throwable.class), new CompletableFuture<>());
+            // TODO: add Assertions.
+        }
+    }
+
+    @Nested
+    class Encoding {
+
+        @Captor
+        ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor;
+
+        @Test
+        void empty() throws Exception {
+            // Given
+            Object msg = null;
+            // When
+            SUT.write(channelHandlerContext, msg, channelPromise);
+            // Then
+            verify(channelHandlerContext, times(1)).write(null, channelPromise);
+        }
+
+        @Test
+        void read() throws Exception {
+            // Given
+            PlcRequestContainer<TestDefaultPlcReadRequest, InternalPlcResponse> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+            // When
+            SUT.write(channelHandlerContext, msg, channelPromise);
+            // Then
+            verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+            List<PlcRequestContainer> capturedValues = plcRequestContainerArgumentCaptor.getAllValues();
+            // We check if every request as exactly one field
+            assertThat(capturedValues)
+                .allMatch(plcRequestContainer -> plcRequestContainer.getRequest() instanceof SingleItemToSingleRequestProtocol.CorrelatedPlcReadRequest)
+                .allMatch(plcRequestContainer -> ((SingleItemToSingleRequestProtocol.CorrelatedPlcReadRequest) plcRequestContainer.getRequest()).getNumberOfFields() == 1);
+            // In sum we should see all fields
+            List<String> fieldNamesList = capturedValues.stream()
+                .map(PlcRequestContainer::getRequest)
+                .map(PlcFieldRequest.class::cast)
+                .map(PlcFieldRequest::getFieldNames)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+            // There should be no duplications
+            assertThat(fieldNamesList).hasSize(5);
+            assertThat(fieldNamesList).containsExactly(
+                "readField1",
+                "readField2",
+                "readField3",
+                "readField4",
+                "readField5"
+            );
+        }
+
+        @Test
+        void write() throws Exception {
+            // Given
+            PlcRequestContainer<TestDefaultPlcWriteRequest, InternalPlcResponse> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(), responseCompletableFuture);
+            // When
+            SUT.write(channelHandlerContext, msg, channelPromise);
+            // Then
+            verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+            List<PlcRequestContainer> capturedValues = plcRequestContainerArgumentCaptor.getAllValues();
+            // We check if every request as exactly one field
+            assertThat(capturedValues)
+                .allMatch(plcRequestContainer -> plcRequestContainer.getRequest() instanceof SingleItemToSingleRequestProtocol.CorrelatedPlcWriteRequest)
+                .allMatch(plcRequestContainer -> ((SingleItemToSingleRequestProtocol.CorrelatedPlcWriteRequest) plcRequestContainer.getRequest()).getNumberOfFields() == 1);
+            // In sum we should see all fields
+            List<String> fieldNamesList = capturedValues.stream()
+                .map(PlcRequestContainer::getRequest)
+                .map(PlcFieldRequest.class::cast)
+                .map(PlcFieldRequest::getFieldNames)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+            // There should be no duplications
+            assertThat(fieldNamesList).hasSize(5);
+            assertThat(fieldNamesList).containsExactly(
+                "writeField1",
+                "writeField2",
+                "writeField3",
+                "writeField4",
+                "writeField5"
+            );
+        }
+
+        @Test
+        void trySendingMessages() throws Exception {
+            SUT.trySendingMessages(channelHandlerContext);
+            // TODO: add assertions
+        }
+    }
+
+    private static class TestDefaultPlcReadRequest extends DefaultPlcReadRequest {
+
+        private TestDefaultPlcReadRequest(LinkedHashMap<String, PlcField> fields) {
+            super(fields);
+        }
+
+        private static TestDefaultPlcReadRequest build() {
+            LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
+            fields.put("readField1", mock(PlcField.class));
+            fields.put("readField2", mock(PlcField.class));
+            fields.put("readField3", mock(PlcField.class));
+            fields.put("readField4", mock(PlcField.class));
+            fields.put("readField5", mock(PlcField.class));
+            return new TestDefaultPlcReadRequest(fields);
+        }
+    }
+
+    private static class TestDefaultPlcWriteRequest extends DefaultPlcWriteRequest {
+
+        private TestDefaultPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
+            super(fields);
+        }
+
+        private static TestDefaultPlcWriteRequest build() {
+            LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
+            fields.put("writeField1", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
+            fields.put("writeField2", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
+            fields.put("writeField3", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
+            fields.put("writeField4", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
+            fields.put("writeField5", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
+            return new TestDefaultPlcWriteRequest(fields);
+        }
+    }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index b4b2867..d1ad775 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,7 +17,8 @@
   limitations under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
   <modelVersion>4.0.0</modelVersion>
 
@@ -301,7 +302,7 @@
             </goals>
             <configuration>
               <rules>
-                <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" />
+                <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
               </rules>
             </configuration>
           </execution>
@@ -490,7 +491,9 @@
             </goals>
             <configuration>
               <url>https://stackpath.bootstrapcdn.com/bootswatch/${bootstrap.version}/flatly/bootstrap.min.css</url>
-              <outputDirectory>${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}/css/</outputDirectory>
+              <outputDirectory>
+                ${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}/css/
+              </outputDirectory>
               <outputFileName>bootstrap.min.css</outputFileName>
             </configuration>
           </execution>
@@ -501,7 +504,9 @@
               <goal>wget</goal>
             </goals>
             <configuration>
-              <url>https://use.fontawesome.com/releases/v${fontawesome.version}/fontawesome-free-${fontawesome.version}-web.zip</url>
+              <url>
+                https://use.fontawesome.com/releases/v${fontawesome.version}/fontawesome-free-${fontawesome.version}-web.zip
+              </url>
               <unpack>true</unpack>
               <outputDirectory>${project.build.directory}/dependency/fontawesome</outputDirectory>
             </configuration>
@@ -523,14 +528,18 @@
               <outputDirectory>${project.build.directory}/site</outputDirectory>
               <resources>
                 <resource>
-                  <directory>${project.build.directory}/dependency/META-INF/resources/webjars/anchorjs/${anchorjs.version}</directory>
+                  <directory>
+                    ${project.build.directory}/dependency/META-INF/resources/webjars/anchorjs/${anchorjs.version}
+                  </directory>
                   <targetPath>${project.build.directory}/site/js</targetPath>
                   <includes>
                     <include>anchor.min.js</include>
                   </includes>
                 </resource>
                 <resource>
-                  <directory>${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}</directory>
+                  <directory>
+                    ${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}
+                  </directory>
                   <includes>
                     <include>css/bootstrap.min.css</include>
                     <include>js/bootstrap.min.js</include>
@@ -538,17 +547,22 @@
                   </includes>
                 </resource>
                 <resource>
-                  <directory>${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web</directory>
+                  <directory>
+                    ${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web
+                  </directory>
                   <includes>
                     <include>css/all.min.css</include>
                   </includes>
                 </resource>
                 <resource>
-                  <directory>${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web/webfonts</directory>
+                  <directory>
+                    ${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web/webfonts
+                  </directory>
                   <targetPath>${project.build.directory}/site/fonts</targetPath>
                 </resource>
                 <resource>
-                  <directory>${project.build.directory}/dependency/META-INF/resources/webjars/jquery/${jquery.version}</directory>
+                  <directory>${project.build.directory}/dependency/META-INF/resources/webjars/jquery/${jquery.version}
+                  </directory>
                   <targetPath>${project.build.directory}/site/js</targetPath>
                   <includes>
                     <include>jquery.min.js</include>
@@ -860,7 +874,7 @@
                     </goals>
                   </pluginExecutionFilter>
                   <action>
-                    <ignore />
+                    <ignore/>
                   </action>
                 </pluginExecution>
               </pluginExecutions>