You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/09/26 16:18:33 UTC

[incubator-plc4x] branch feature/TopLevelItemSpliting updated (a0c3607 -> 9e25460)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git.


 discard a0c3607  [General] SingleItemToSingleRequestProtocol implemented ErrorHandler
 discard 31e2acb  [General] some progress on the SingleItemToSingleRequestProtocol
 discard 7e8dd94  Introduced protocol layer to split requestItems to several requests.
     add 523ada0  [General] added some utility methods to requests and responses.
     new 10cf16a  Introduced protocol layer to split requestItems to several requests.
     new be82ba7  [General] some progress on the SingleItemToSingleRequestProtocol
     new 9e25460  [General] SingleItemToSingleRequestProtocol implemented ErrorHandler

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (a0c3607)
            \
             N -- N -- N   refs/heads/feature/TopLevelItemSpliting (9e25460)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[incubator-plc4x] 03/03: [General] SingleItemToSingleRequestProtocol implemented ErrorHandler

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 9e25460948d2769638d79dc792ff4b157f5e1789
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Sep 26 18:07:56 2018 +0200

    [General] SingleItemToSingleRequestProtocol implemented ErrorHandler
---
 .../plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 267f9a8..0fdebf5 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -118,7 +118,11 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     }
 
     private void errored(int correlationId, Throwable throwable) {
-
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId);
+        if (plcRequestContainer == null) {
+            throw new PlcRuntimeException("Unrelated error received ", throwable);
+        }
+        plcRequestContainer.getResponseFuture().completeExceptionally(throwable);
     }
 
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////


[incubator-plc4x] 01/03: Introduced protocol layer to split requestItems to several requests.

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 10cf16a7c47d113d88f5d19ada479475442bd0fc
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Jul 5 13:20:43 2018 +0200

    Introduced protocol layer to split requestItems to several requests.
---
 .../base/messages/item/CorrelatedRequestItem.java  |  81 +++++++++
 .../base/messages/item/CorrelatedResponseItem.java |  70 ++++++++
 .../SingleItemToSingleRequestProtocol.java         | 193 +++++++++++++++++++++
 3 files changed, 344 insertions(+)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
new file mode 100644
index 0000000..5212618
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
@@ -0,0 +1,81 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.base.messages.item;
+
+import org.apache.plc4x.java.api.messages.PlcRequestContainer;
+import org.apache.plc4x.java.api.messages.PlcResponse;
+import org.apache.plc4x.java.api.messages.items.RequestItem;
+
+import java.util.Objects;
+
+public class CorrelatedRequestItem<REQUEST_ITEM extends RequestItem<?>> {
+
+    private final int correlationId;
+
+    private final REQUEST_ITEM requestItem;
+
+    private final PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer;
+
+    public CorrelatedRequestItem(int correlationId, REQUEST_ITEM requestItem, PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer) {
+        this.correlationId = correlationId;
+        this.requestItem = requestItem;
+        this.plcRequestContainer = plcRequestContainer;
+    }
+
+    public int getCorrelationId() {
+        return correlationId;
+    }
+
+    public REQUEST_ITEM getRequestItem() {
+        return requestItem;
+    }
+
+    public PlcRequestContainer<?, PlcResponse<?, ?, ?>> getPlcRequestContainer() {
+        return plcRequestContainer;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof CorrelatedRequestItem)) {
+            return false;
+        }
+        CorrelatedRequestItem<?> that = (CorrelatedRequestItem<?>) o;
+        return correlationId == that.correlationId &&
+            Objects.equals(requestItem, that.requestItem) &&
+            Objects.equals(plcRequestContainer, that.plcRequestContainer);
+    }
+
+    @Override
+    public int hashCode() {
+
+        return Objects.hash(correlationId, requestItem, plcRequestContainer);
+    }
+
+    @Override
+    public String toString() {
+        return "CorrelatedRequestItem{" +
+            "correlationId=" + correlationId +
+            ", requestItem=" + requestItem +
+            ", plcRequestContainer=" + plcRequestContainer +
+            '}';
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
new file mode 100644
index 0000000..38a9032
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
@@ -0,0 +1,70 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+ 
+   http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.base.messages.item;
+
+import org.apache.plc4x.java.api.messages.items.ResponseItem;
+
+import java.util.Objects;
+
+public class CorrelatedResponseItem<RESPONSE_ITEM extends ResponseItem<?>> {
+
+    private final int correlationId;
+
+    private final RESPONSE_ITEM responseItem;
+
+    public CorrelatedResponseItem(int correlationId, RESPONSE_ITEM responseItem) {
+        this.correlationId = correlationId;
+        this.responseItem = responseItem;
+    }
+
+    public int getCorrelationId() {
+        return correlationId;
+    }
+
+    public RESPONSE_ITEM getResponseItem() {
+        return responseItem;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof CorrelatedResponseItem)) {
+            return false;
+        }
+        CorrelatedResponseItem<?> that = (CorrelatedResponseItem<?>) o;
+        return correlationId == that.correlationId &&
+            Objects.equals(responseItem, that.responseItem);
+    }
+
+    @Override
+    public int hashCode() {
+
+        return Objects.hash(correlationId, responseItem);
+    }
+
+    @Override
+    public String toString() {
+        return "CorrelatedResponseItem{" +
+            "correlationId=" + correlationId +
+            ", responseItem=" + responseItem +
+            '}';
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
new file mode 100644
index 0000000..8c0e272
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -0,0 +1,193 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.base.protocol;
+
+import io.netty.channel.*;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.PromiseCombiner;
+import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.items.RequestItem;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse;
+import org.apache.plc4x.java.base.messages.item.CorrelatedRequestItem;
+import org.apache.plc4x.java.base.messages.item.CorrelatedResponseItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
+
+    public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
+
+    private PendingWriteQueue queue;
+
+    private ConcurrentMap<Integer, CorrelatedRequestItem<?>> sentButUnacknowledgedRequestItems;
+
+    private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap;
+
+    private ConcurrentMap<PlcRequestContainer<?, ?>, List<CorrelatedResponseItem<?>>> responsesToBeDevliered;
+
+    private AtomicInteger correlationId;
+
+    private final MessageToMessageDecoder<CorrelatedResponseItem> decoder = new MessageToMessageDecoder<CorrelatedResponseItem>() {
+
+        @Override
+        protected void decode(ChannelHandlerContext ctx, CorrelatedResponseItem msg, List<Object> out) throws Exception {
+            SingleItemToSingleRequestProtocol.this.decode(ctx, msg, out);
+        }
+    };
+
+    @Override
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+        this.queue = new PendingWriteQueue(ctx);
+        this.sentButUnacknowledgedRequestItems = new ConcurrentHashMap<>();
+        this.containerCorrelationIdMap = new ConcurrentHashMap<>();
+        this.correlationId = new AtomicInteger();
+        super.channelRegistered(ctx);
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+        this.queue.removeAndWriteAll();
+        super.channelUnregistered(ctx);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        // Send everything so we get a proper failure for those pending writes
+        this.queue.removeAndWriteAll();
+        super.channelInactive(ctx);
+    }
+
+    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // Decoding
+    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        decoder.channelRead(ctx, msg);
+        super.read(ctx);
+    }
+
+    private void decode(ChannelHandlerContext ctx, CorrelatedResponseItem<?> msg, List<Object> out) throws PlcProtocolException {
+        int correlationId = msg.getCorrelationId();
+        CorrelatedRequestItem<?> correlatedRequestItem = sentButUnacknowledgedRequestItems.remove(correlationId);
+        if (correlatedRequestItem == null) {
+            throw new PlcProtocolException("Unrelated package received " + msg);
+        }
+        PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer = correlatedRequestItem.getPlcRequestContainer();
+        List<CorrelatedResponseItem<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
+        correlatedResponseItems.add(msg);
+        Set<Integer> integers = containerCorrelationIdMap.get(plcRequestContainer);
+        integers.remove(correlationId);
+        if (integers.isEmpty()) {
+            PlcResponse<?, ?, ?> plcResponse;
+            if (plcRequestContainer.getRequest() instanceof TypeSafePlcReadRequest) {
+                TypeSafePlcReadRequest typeSafePlcReadRequest = (TypeSafePlcReadRequest) plcRequestContainer.getRequest();
+                plcResponse = new TypeSafePlcReadResponse((TypeSafePlcReadRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            } else if (plcRequestContainer.getRequest() instanceof TypeSafePlcWriteRequest) {
+                plcResponse = new TypeSafePlcWriteResponse((TypeSafePlcWriteRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            } else if (plcRequestContainer.getRequest() instanceof PlcReadRequest) {
+                plcResponse = new PlcReadResponse((PlcReadRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            } else if (plcRequestContainer.getRequest() instanceof PlcWriteRequest) {
+                plcResponse = new PlcWriteResponse((PlcWriteRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            } else {
+                throw new PlcProtocolException("Unknown type detected " + plcRequestContainer.getRequest());
+            }
+            plcRequestContainer.getResponseFuture().complete(plcResponse);
+            responsesToBeDevliered.remove(plcRequestContainer);
+        }
+    }
+
+    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // Encoding
+    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+        if (msg instanceof PlcRequestContainer) {
+            PlcRequestContainer<?, PlcResponse<?, ?, ?>> in = (PlcRequestContainer<?, PlcResponse<?, ?, ?>>) msg;
+            Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> new HashSet<>());
+
+            // Create a promise that has to be called multiple times.
+            PromiseCombiner promiseCombiner = new PromiseCombiner();
+            PlcRequest<?> request = in.getRequest();
+            for (RequestItem<?> item : request.getRequestItems()) {
+                ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+
+                int tdpu = correlationId.getAndIncrement();
+                queue.add(new CorrelatedRequestItem<>(tdpu, item, in), subPromise);
+                if (!tdpus.add(tdpu)) {
+                    throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                }
+                promiseCombiner.add((Future) subPromise);
+            }
+            promiseCombiner.finish(promise);
+
+            // Start sending the queue content.
+            trySendingMessages(ctx);
+        } else {
+            super.write(ctx, msg, promise);
+        }
+    }
+
+    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // Helpers
+    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    protected synchronized void trySendingMessages(ChannelHandlerContext ctx) {
+        while (queue.size() > 0) {
+            // Get the RequestItem that is up next in the queue.
+            CorrelatedRequestItem<?> currentItem = (CorrelatedRequestItem) queue.current();
+
+            if (currentItem == null) {
+                break;
+            }
+            // Send the TPDU.
+            try {
+                ChannelFuture channelFuture = queue.removeAndWrite();
+                ctx.flush();
+                if (channelFuture == null) {
+                    break;
+                }
+            } catch (Exception e) {
+                LOGGER.error("Error sending more queues messages", e);
+                ctx.fireExceptionCaught(e);
+            }
+
+            // Add it to the list of sentButUnacknowledgedRequestItems.
+            sentButUnacknowledgedRequestItems.put(currentItem.getCorrelationId(), currentItem);
+
+            LOGGER.debug("Item Message with id {} sent", currentItem.getCorrelationId());
+        }
+        ctx.flush();
+    }
+}


[incubator-plc4x] 02/03: [General] some progress on the SingleItemToSingleRequestProtocol

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit be82ba7ca4d7d569cc50e532a567b258df32120c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Sep 26 18:01:46 2018 +0200

    [General] some progress on the SingleItemToSingleRequestProtocol
---
 .../base/messages/item/CorrelatedRequestItem.java  |  81 --------
 .../base/messages/item/CorrelatedResponseItem.java |  70 -------
 .../SingleItemToSingleRequestProtocol.java         | 217 +++++++++++++++------
 3 files changed, 153 insertions(+), 215 deletions(-)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
deleted file mode 100644
index 5212618..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.base.messages.item;
-
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
-import org.apache.plc4x.java.api.messages.PlcResponse;
-import org.apache.plc4x.java.api.messages.items.RequestItem;
-
-import java.util.Objects;
-
-public class CorrelatedRequestItem<REQUEST_ITEM extends RequestItem<?>> {
-
-    private final int correlationId;
-
-    private final REQUEST_ITEM requestItem;
-
-    private final PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer;
-
-    public CorrelatedRequestItem(int correlationId, REQUEST_ITEM requestItem, PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer) {
-        this.correlationId = correlationId;
-        this.requestItem = requestItem;
-        this.plcRequestContainer = plcRequestContainer;
-    }
-
-    public int getCorrelationId() {
-        return correlationId;
-    }
-
-    public REQUEST_ITEM getRequestItem() {
-        return requestItem;
-    }
-
-    public PlcRequestContainer<?, PlcResponse<?, ?, ?>> getPlcRequestContainer() {
-        return plcRequestContainer;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof CorrelatedRequestItem)) {
-            return false;
-        }
-        CorrelatedRequestItem<?> that = (CorrelatedRequestItem<?>) o;
-        return correlationId == that.correlationId &&
-            Objects.equals(requestItem, that.requestItem) &&
-            Objects.equals(plcRequestContainer, that.plcRequestContainer);
-    }
-
-    @Override
-    public int hashCode() {
-
-        return Objects.hash(correlationId, requestItem, plcRequestContainer);
-    }
-
-    @Override
-    public String toString() {
-        return "CorrelatedRequestItem{" +
-            "correlationId=" + correlationId +
-            ", requestItem=" + requestItem +
-            ", plcRequestContainer=" + plcRequestContainer +
-            '}';
-    }
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
deleted file mode 100644
index 38a9032..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
- 
-   http://www.apache.org/licenses/LICENSE-2.0
- 
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.base.messages.item;
-
-import org.apache.plc4x.java.api.messages.items.ResponseItem;
-
-import java.util.Objects;
-
-public class CorrelatedResponseItem<RESPONSE_ITEM extends ResponseItem<?>> {
-
-    private final int correlationId;
-
-    private final RESPONSE_ITEM responseItem;
-
-    public CorrelatedResponseItem(int correlationId, RESPONSE_ITEM responseItem) {
-        this.correlationId = correlationId;
-        this.responseItem = responseItem;
-    }
-
-    public int getCorrelationId() {
-        return correlationId;
-    }
-
-    public RESPONSE_ITEM getResponseItem() {
-        return responseItem;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof CorrelatedResponseItem)) {
-            return false;
-        }
-        CorrelatedResponseItem<?> that = (CorrelatedResponseItem<?>) o;
-        return correlationId == that.correlationId &&
-            Objects.equals(responseItem, that.responseItem);
-    }
-
-    @Override
-    public int hashCode() {
-
-        return Objects.hash(correlationId, responseItem);
-    }
-
-    @Override
-    public String toString() {
-        return "CorrelatedResponseItem{" +
-            "correlationId=" + correlationId +
-            ", responseItem=" + responseItem +
-            '}';
-    }
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 8c0e272..267f9a8 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -19,52 +19,39 @@
 package org.apache.plc4x.java.base.protocol;
 
 import io.netty.channel.*;
-import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.PromiseCombiner;
-import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
-import org.apache.plc4x.java.api.messages.*;
-import org.apache.plc4x.java.api.messages.items.RequestItem;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse;
-import org.apache.plc4x.java.base.messages.item.CorrelatedRequestItem;
-import org.apache.plc4x.java.base.messages.item.CorrelatedResponseItem;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.*;
+import org.apache.plc4x.java.base.messages.items.FieldItem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
+// TODO: write test
 public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
 
     private PendingWriteQueue queue;
 
-    private ConcurrentMap<Integer, CorrelatedRequestItem<?>> sentButUnacknowledgedRequestItems;
+    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedRequestItems;
 
     private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap;
 
-    private ConcurrentMap<PlcRequestContainer<?, ?>, List<CorrelatedResponseItem<?>>> responsesToBeDevliered;
+    private ConcurrentMap<PlcRequestContainer<?, ?>, List<InternalPlcResponse<?>>> responsesToBeDevliered;
 
     private AtomicInteger correlationId;
 
-    private final MessageToMessageDecoder<CorrelatedResponseItem> decoder = new MessageToMessageDecoder<CorrelatedResponseItem>() {
-
-        @Override
-        protected void decode(ChannelHandlerContext ctx, CorrelatedResponseItem msg, List<Object> out) throws Exception {
-            SingleItemToSingleRequestProtocol.this.decode(ctx, msg, out);
-        }
-    };
-
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
         this.queue = new PendingWriteQueue(ctx);
@@ -91,42 +78,49 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     // Decoding
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        decoder.channelRead(ctx, msg);
-        super.read(ctx);
-    }
-
-    private void decode(ChannelHandlerContext ctx, CorrelatedResponseItem<?> msg, List<Object> out) throws PlcProtocolException {
-        int correlationId = msg.getCorrelationId();
-        CorrelatedRequestItem<?> correlatedRequestItem = sentButUnacknowledgedRequestItems.remove(correlationId);
-        if (correlatedRequestItem == null) {
-            throw new PlcProtocolException("Unrelated package received " + msg);
+    private void tryFinish(int correlationId, InternalPlcResponse msg) {
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId);
+        if (plcRequestContainer == null) {
+            throw new PlcRuntimeException("Unrelated package received " + msg);
         }
-        PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer = correlatedRequestItem.getPlcRequestContainer();
-        List<CorrelatedResponseItem<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
+        List<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
         correlatedResponseItems.add(msg);
         Set<Integer> integers = containerCorrelationIdMap.get(plcRequestContainer);
         integers.remove(correlationId);
         if (integers.isEmpty()) {
-            PlcResponse<?, ?, ?> plcResponse;
-            if (plcRequestContainer.getRequest() instanceof TypeSafePlcReadRequest) {
-                TypeSafePlcReadRequest typeSafePlcReadRequest = (TypeSafePlcReadRequest) plcRequestContainer.getRequest();
-                plcResponse = new TypeSafePlcReadResponse((TypeSafePlcReadRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
-            } else if (plcRequestContainer.getRequest() instanceof TypeSafePlcWriteRequest) {
-                plcResponse = new TypeSafePlcWriteResponse((TypeSafePlcWriteRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
-            } else if (plcRequestContainer.getRequest() instanceof PlcReadRequest) {
-                plcResponse = new PlcReadResponse((PlcReadRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
-            } else if (plcRequestContainer.getRequest() instanceof PlcWriteRequest) {
-                plcResponse = new PlcWriteResponse((PlcWriteRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            InternalPlcResponse<?> plcResponse;
+            if (plcRequestContainer.getRequest() instanceof InternalPlcReadRequest) {
+                InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) plcRequestContainer.getRequest();
+                HashMap<String, Pair<PlcResponseCode, FieldItem>> fields = new HashMap<>();
+
+                correlatedResponseItems.stream()
+                    .map(InternalPlcReadResponse.class::cast)
+                    .map(InternalPlcReadResponse::getValues)
+                    .forEach(stringPairMap -> stringPairMap.forEach(fields::put));
+
+                plcResponse = new DefaultPlcReadResponse(internalPlcReadRequest, fields);
+            } else if (plcRequestContainer.getRequest() instanceof InternalPlcWriteRequest) {
+                InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) plcRequestContainer.getRequest();
+                HashMap<String, PlcResponseCode> values = new HashMap<>();
+
+                correlatedResponseItems.stream()
+                    .map(InternalPlcWriteResponse.class::cast)
+                    .map(InternalPlcWriteResponse::getValues)
+                    .forEach(stringPairMap -> stringPairMap.forEach(values::put));
+
+                plcResponse = new DefaultPlcWriteResponse(internalPlcWriteRequest, values);
             } else {
-                throw new PlcProtocolException("Unknown type detected " + plcRequestContainer.getRequest());
+                throw new PlcRuntimeException("Unknown type detected " + plcRequestContainer.getRequest());
             }
             plcRequestContainer.getResponseFuture().complete(plcResponse);
             responsesToBeDevliered.remove(plcRequestContainer);
         }
     }
 
+    private void errored(int correlationId, Throwable throwable) {
+
+    }
+
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
     // Encoding
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -134,22 +128,68 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     @Override
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
         if (msg instanceof PlcRequestContainer) {
-            PlcRequestContainer<?, PlcResponse<?, ?, ?>> in = (PlcRequestContainer<?, PlcResponse<?, ?, ?>>) msg;
+            @SuppressWarnings("unchecked")
+            PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>) msg;
             Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> new HashSet<>());
 
             // Create a promise that has to be called multiple times.
             PromiseCombiner promiseCombiner = new PromiseCombiner();
-            PlcRequest<?> request = in.getRequest();
-            for (RequestItem<?> item : request.getRequestItems()) {
-                ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+            InternalPlcRequest request = in.getRequest();
+            if (request instanceof InternalPlcFieldRequest) {
+                InternalPlcFieldRequest internalPlcFieldRequest = (InternalPlcFieldRequest) request;
+
+                if (internalPlcFieldRequest instanceof InternalPlcReadRequest) {
+                    InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) internalPlcFieldRequest;
+                    // TODO: repackage
+                    internalPlcReadRequest.getNamedFields().forEach(field -> {
+                        ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+
+                        int tdpu = correlationId.getAndIncrement();
+                        CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
+                            .thenApply(InternalPlcResponse.class::cast)
+                            .whenComplete((internalPlcResponse, throwable) -> {
+                                if (throwable != null) {
+                                    errored(tdpu, throwable);
+                                } else {
+                                    tryFinish(tdpu, internalPlcResponse);
+                                }
+                            });
+                        queue.add(new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture), subPromise);
+                        if (!tdpus.add(tdpu)) {
+                            throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                        }
+                        promiseCombiner.add((Future) subPromise);
+                    });
+                }
+                if (internalPlcFieldRequest instanceof InternalPlcWriteRequest) {
+                    InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) internalPlcFieldRequest;
+                    // TODO: repackage
+                    internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> {
+                        ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
 
-                int tdpu = correlationId.getAndIncrement();
-                queue.add(new CorrelatedRequestItem<>(tdpu, item, in), subPromise);
-                if (!tdpus.add(tdpu)) {
-                    throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                        int tdpu = correlationId.getAndIncrement();
+                        CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
+                            .thenApply(InternalPlcResponse.class::cast)
+                            .whenComplete((internalPlcResponse, throwable) -> {
+                                if (throwable != null) {
+                                    errored(tdpu, throwable);
+                                } else {
+                                    tryFinish(tdpu, internalPlcResponse);
+                                }
+                            });
+                        queue.add(new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture), subPromise);
+                        if (!tdpus.add(tdpu)) {
+                            throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                        }
+                        promiseCombiner.add((Future) subPromise);
+                    });
                 }
+            } else {
+                ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+                queue.add(msg, subPromise);
                 promiseCombiner.add((Future) subPromise);
             }
+
             promiseCombiner.finish(promise);
 
             // Start sending the queue content.
@@ -166,11 +206,9 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     protected synchronized void trySendingMessages(ChannelHandlerContext ctx) {
         while (queue.size() > 0) {
             // Get the RequestItem that is up next in the queue.
-            CorrelatedRequestItem<?> currentItem = (CorrelatedRequestItem) queue.current();
+            PlcRequestContainer currentItem = (PlcRequestContainer) queue.current();
+            InternalPlcRequest request = currentItem.getRequest();
 
-            if (currentItem == null) {
-                break;
-            }
             // Send the TPDU.
             try {
                 ChannelFuture channelFuture = queue.removeAndWrite();
@@ -183,11 +221,62 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                 ctx.fireExceptionCaught(e);
             }
 
-            // Add it to the list of sentButUnacknowledgedRequestItems.
-            sentButUnacknowledgedRequestItems.put(currentItem.getCorrelationId(), currentItem);
+            if (request instanceof CorrelatedPlcRequest) {
+                CorrelatedPlcRequest correlatedPlcRequest = (CorrelatedPlcRequest) request;
+
+                // Add it to the list of sentButUnacknowledgedRequestItems.
+                sentButUnacknowledgedRequestItems.put(correlatedPlcRequest.getTdpu(), currentItem);
 
-            LOGGER.debug("Item Message with id {} sent", currentItem.getCorrelationId());
+                LOGGER.debug("Item Message with id {} sent", correlatedPlcRequest.getTdpu());
+            }
         }
         ctx.flush();
     }
+
+    interface CorrelatedPlcRequest extends InternalPlcRequest {
+
+        int getTdpu();
+    }
+
+    private static class CorrelatedPlcReadRequest extends DefaultPlcReadRequest implements CorrelatedPlcRequest {
+
+        private final int tdpu;
+
+        public CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) {
+            super(fields);
+            this.tdpu = tdpu;
+        }
+
+        public static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
+            LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
+            fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue());
+            return new CorrelatedPlcReadRequest(fields, tdpu);
+        }
+
+        @Override
+        public int getTdpu() {
+            return tdpu;
+        }
+    }
+
+    private static class CorrelatedPlcWriteRequest extends DefaultPlcWriteRequest implements CorrelatedPlcRequest {
+
+        private final int tdpu;
+
+        public CorrelatedPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields, int tdpu) {
+            super(fields);
+            this.tdpu = tdpu;
+        }
+
+        public static CorrelatedPlcWriteRequest of(Triple<String, PlcField, FieldItem> fieldItemTriple, int tdpu) {
+            LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
+            fields.put(fieldItemTriple.getLeft(), Pair.of(fieldItemTriple.getMiddle(), fieldItemTriple.getRight()));
+            return new CorrelatedPlcWriteRequest(fields, tdpu);
+        }
+
+        @Override
+        public int getTdpu() {
+            return tdpu;
+        }
+    }
 }