You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/09/26 16:11:20 UTC

[incubator-plc4x] 02/03: [General] some progress on the SingleItemToSingleRequestProtocol

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 31e2acba2d748ae7d5d9197be9057b707f943151
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Sep 26 18:01:46 2018 +0200

    [General] some progress on the SingleItemToSingleRequestProtocol
---
 .../java/base/messages/DefaultPlcReadRequest.java  |  12 +-
 .../java/base/messages/DefaultPlcReadResponse.java |   5 +
 .../messages/DefaultPlcSubscriptionRequest.java    |  15 +-
 .../messages/DefaultPlcUnsubscriptionRequest.java  |   6 +
 .../java/base/messages/DefaultPlcWriteRequest.java |  28 ++-
 .../base/messages/DefaultPlcWriteResponse.java     |   5 +
 .../base/messages/InternalPlcFieldRequest.java     |   6 +
 .../java/base/messages/InternalPlcReadRequest.java |   2 +-
 .../base/messages/InternalPlcReadResponse.java     |   6 +
 .../base/messages/InternalPlcWriteRequest.java     |   4 +
 .../base/messages/InternalPlcWriteResponse.java    |   5 +-
 .../base/messages/item/CorrelatedRequestItem.java  |  81 --------
 .../base/messages/item/CorrelatedResponseItem.java |  70 -------
 .../SingleItemToSingleRequestProtocol.java         | 217 +++++++++++++++------
 14 files changed, 238 insertions(+), 224 deletions(-)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
index ef9d7ba..9b60fe4 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
@@ -18,18 +18,20 @@ under the License.
 */
 package org.apache.plc4x.java.base.messages;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.base.connection.PlcFieldHandler;
 
 import java.util.*;
+import java.util.stream.Collectors;
 
 public class DefaultPlcReadRequest implements InternalPlcReadRequest, InternalPlcFieldRequest {
 
     private LinkedHashMap<String, PlcField> fields;
 
-    private DefaultPlcReadRequest(LinkedHashMap<String, PlcField> fields) {
+    protected DefaultPlcReadRequest(LinkedHashMap<String, PlcField> fields) {
         this.fields = fields;
     }
 
@@ -54,6 +56,14 @@ public class DefaultPlcReadRequest implements InternalPlcReadRequest, InternalPl
         return new LinkedList<>(fields.values());
     }
 
+    @Override
+    public LinkedList<Pair<String, PlcField>> getNamedFields() {
+        return fields.entrySet()
+            .stream()
+            .map(stringPlcFieldEntry -> Pair.of(stringPlcFieldEntry.getKey(), stringPlcFieldEntry.getValue()))
+            .collect(Collectors.toCollection(LinkedList::new));
+    }
+
     public static class Builder implements PlcReadRequest.Builder {
 
         private final PlcFieldHandler fieldHandler;
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadResponse.java
index cd2dc47..73cad97 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadResponse.java
@@ -76,6 +76,11 @@ public class DefaultPlcReadResponse implements InternalPlcReadResponse {
     }
 
     @Override
+    public Map<String, Pair<PlcResponseCode, FieldItem>> getValues() {
+        return values;
+    }
+
+    @Override
     public Object getObject(String name) {
         return getObject(name, 0);
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
index f50f0ad..9f59cb3 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
@@ -34,27 +34,32 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
 
     @Override
     public int getNumberOfFields() {
-        return 0;
+        throw new IllegalStateException("not available");
     }
 
     @Override
     public LinkedHashSet<String> getFieldNames() {
-        return null;
+        throw new IllegalStateException("not available");
     }
 
     @Override
     public PlcField getField(String name) {
-        return null;
+        throw new IllegalStateException("not available");
     }
 
     @Override
     public LinkedList<PlcField> getFields() {
-        return null;
+        throw new IllegalStateException("not available");
     }
 
     @Override
     public PlcSubscriptionType getPlcSubscriptionType() {
-        return null;
+        throw new IllegalStateException("not available");
+    }
+
+    @Override
+    public LinkedList<Pair<String, PlcField>> getNamedFields() {
+        throw new IllegalStateException("not available");
     }
 
     public static class Builder implements PlcSubscriptionRequest.Builder {
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
index 38a52f0..5dbccea 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.plc4x.java.base.messages;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
@@ -59,6 +60,11 @@ public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptio
         return internalPlcSubscriptionHandles;
     }
 
+    @Override
+    public LinkedList<Pair<String, PlcField>> getNamedFields() {
+        throw new IllegalStateException("not available");
+    }
+
     public static class Builder implements PlcUnsubscriptionRequest.Builder {
 
         private List<InternalPlcSubscriptionHandle> plcSubscriptionHandles;
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
index 1eb3458..d7e4086 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.java.base.messages;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.model.PlcField;
@@ -39,7 +40,7 @@ public class DefaultPlcWriteRequest implements InternalPlcWriteRequest, Internal
 
     private final LinkedHashMap<String, Pair<PlcField, FieldItem>> fields;
 
-    private DefaultPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
+    protected DefaultPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
         this.fields = fields;
     }
 
@@ -74,6 +75,31 @@ public class DefaultPlcWriteRequest implements InternalPlcWriteRequest, Internal
     }
 
     @Override
+    public LinkedList<Pair<String, PlcField>> getNamedFields() {
+        return fields.entrySet()
+            .stream()
+            .map(stringPairEntry ->
+                Pair.of(
+                    stringPairEntry.getKey(),
+                    stringPairEntry.getValue().getKey()
+                )
+            ).collect(Collectors.toCollection(LinkedList::new));
+    }
+
+    @Override
+    public LinkedList<Triple<String, PlcField, FieldItem>> getNamedFieldTriples() {
+        return fields.entrySet()
+            .stream()
+            .map(stringPairEntry ->
+                Triple.of(
+                    stringPairEntry.getKey(),
+                    stringPairEntry.getValue().getKey(),
+                    stringPairEntry.getValue().getValue()
+                )
+            ).collect(Collectors.toCollection(LinkedList::new));
+    }
+
+    @Override
     public int getNumberOfValues(String name) {
         return fields.get(name).getValue().getNumberOfValues();
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteResponse.java
index e7c5e83..a004f0f 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteResponse.java
@@ -35,6 +35,11 @@ public class DefaultPlcWriteResponse implements InternalPlcWriteResponse {
     }
 
     @Override
+    public Map<String, PlcResponseCode> getValues() {
+        return values;
+    }
+
+    @Override
     public InternalPlcWriteRequest getRequest() {
         return request;
     }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java
index 55d508f..a7ee30c 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java
@@ -18,8 +18,14 @@
  */
 package org.apache.plc4x.java.base.messages;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.messages.PlcFieldRequest;
+import org.apache.plc4x.java.api.model.PlcField;
+
+import java.util.LinkedList;
 
 public interface InternalPlcFieldRequest extends PlcFieldRequest {
 
+    LinkedList<Pair<String, PlcField>> getNamedFields();
+
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadRequest.java
index b1cd4b0..c787f00 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadRequest.java
@@ -20,7 +20,7 @@ package org.apache.plc4x.java.base.messages;
 
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 
-public interface InternalPlcReadRequest extends PlcReadRequest, InternalPlcRequest {
+public interface InternalPlcReadRequest extends PlcReadRequest, InternalPlcFieldRequest, InternalPlcRequest {
 
 }
 
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
index 27ae68e..775a4c7 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
@@ -18,8 +18,14 @@
  */
 package org.apache.plc4x.java.base.messages;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.items.FieldItem;
+
+import java.util.Map;
 
 public interface InternalPlcReadResponse extends PlcReadResponse<InternalPlcReadRequest>, InternalPlcResponse<InternalPlcReadRequest> {
 
+    Map<String, Pair<PlcResponseCode, FieldItem>> getValues();
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteRequest.java
index a4bad51..f10cd89 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteRequest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.plc4x.java.base.messages;
 
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.base.messages.items.FieldItem;
 
 import java.util.LinkedList;
@@ -28,4 +30,6 @@ public interface InternalPlcWriteRequest extends PlcWriteRequest, InternalPlcReq
     FieldItem getFieldItem(String name);
 
     LinkedList<FieldItem> getFieldItems();
+
+    LinkedList<Triple<String, PlcField, FieldItem>> getNamedFieldTriples();
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
index 9d127a5..dbcdc5c 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
@@ -19,7 +19,10 @@
 package org.apache.plc4x.java.base.messages;
 
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
 
-public interface InternalPlcWriteResponse extends PlcWriteResponse<InternalPlcWriteRequest>, InternalPlcResponse<InternalPlcWriteRequest> {
+import java.util.Map;
 
+public interface InternalPlcWriteResponse extends PlcWriteResponse<InternalPlcWriteRequest>, InternalPlcResponse<InternalPlcWriteRequest> {
+    Map<String, PlcResponseCode> getValues();
 }
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
deleted file mode 100644
index 5212618..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.base.messages.item;
-
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
-import org.apache.plc4x.java.api.messages.PlcResponse;
-import org.apache.plc4x.java.api.messages.items.RequestItem;
-
-import java.util.Objects;
-
-public class CorrelatedRequestItem<REQUEST_ITEM extends RequestItem<?>> {
-
-    private final int correlationId;
-
-    private final REQUEST_ITEM requestItem;
-
-    private final PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer;
-
-    public CorrelatedRequestItem(int correlationId, REQUEST_ITEM requestItem, PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer) {
-        this.correlationId = correlationId;
-        this.requestItem = requestItem;
-        this.plcRequestContainer = plcRequestContainer;
-    }
-
-    public int getCorrelationId() {
-        return correlationId;
-    }
-
-    public REQUEST_ITEM getRequestItem() {
-        return requestItem;
-    }
-
-    public PlcRequestContainer<?, PlcResponse<?, ?, ?>> getPlcRequestContainer() {
-        return plcRequestContainer;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof CorrelatedRequestItem)) {
-            return false;
-        }
-        CorrelatedRequestItem<?> that = (CorrelatedRequestItem<?>) o;
-        return correlationId == that.correlationId &&
-            Objects.equals(requestItem, that.requestItem) &&
-            Objects.equals(plcRequestContainer, that.plcRequestContainer);
-    }
-
-    @Override
-    public int hashCode() {
-
-        return Objects.hash(correlationId, requestItem, plcRequestContainer);
-    }
-
-    @Override
-    public String toString() {
-        return "CorrelatedRequestItem{" +
-            "correlationId=" + correlationId +
-            ", requestItem=" + requestItem +
-            ", plcRequestContainer=" + plcRequestContainer +
-            '}';
-    }
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
deleted file mode 100644
index 38a9032..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
- 
-   http://www.apache.org/licenses/LICENSE-2.0
- 
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.base.messages.item;
-
-import org.apache.plc4x.java.api.messages.items.ResponseItem;
-
-import java.util.Objects;
-
-public class CorrelatedResponseItem<RESPONSE_ITEM extends ResponseItem<?>> {
-
-    private final int correlationId;
-
-    private final RESPONSE_ITEM responseItem;
-
-    public CorrelatedResponseItem(int correlationId, RESPONSE_ITEM responseItem) {
-        this.correlationId = correlationId;
-        this.responseItem = responseItem;
-    }
-
-    public int getCorrelationId() {
-        return correlationId;
-    }
-
-    public RESPONSE_ITEM getResponseItem() {
-        return responseItem;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof CorrelatedResponseItem)) {
-            return false;
-        }
-        CorrelatedResponseItem<?> that = (CorrelatedResponseItem<?>) o;
-        return correlationId == that.correlationId &&
-            Objects.equals(responseItem, that.responseItem);
-    }
-
-    @Override
-    public int hashCode() {
-
-        return Objects.hash(correlationId, responseItem);
-    }
-
-    @Override
-    public String toString() {
-        return "CorrelatedResponseItem{" +
-            "correlationId=" + correlationId +
-            ", responseItem=" + responseItem +
-            '}';
-    }
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 8c0e272..267f9a8 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -19,52 +19,39 @@
 package org.apache.plc4x.java.base.protocol;
 
 import io.netty.channel.*;
-import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.PromiseCombiner;
-import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
-import org.apache.plc4x.java.api.messages.*;
-import org.apache.plc4x.java.api.messages.items.RequestItem;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse;
-import org.apache.plc4x.java.base.messages.item.CorrelatedRequestItem;
-import org.apache.plc4x.java.base.messages.item.CorrelatedResponseItem;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.*;
+import org.apache.plc4x.java.base.messages.items.FieldItem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
+// TODO: write test
 public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
 
     private PendingWriteQueue queue;
 
-    private ConcurrentMap<Integer, CorrelatedRequestItem<?>> sentButUnacknowledgedRequestItems;
+    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedRequestItems;
 
     private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap;
 
-    private ConcurrentMap<PlcRequestContainer<?, ?>, List<CorrelatedResponseItem<?>>> responsesToBeDevliered;
+    private ConcurrentMap<PlcRequestContainer<?, ?>, List<InternalPlcResponse<?>>> responsesToBeDevliered;
 
     private AtomicInteger correlationId;
 
-    private final MessageToMessageDecoder<CorrelatedResponseItem> decoder = new MessageToMessageDecoder<CorrelatedResponseItem>() {
-
-        @Override
-        protected void decode(ChannelHandlerContext ctx, CorrelatedResponseItem msg, List<Object> out) throws Exception {
-            SingleItemToSingleRequestProtocol.this.decode(ctx, msg, out);
-        }
-    };
-
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
         this.queue = new PendingWriteQueue(ctx);
@@ -91,42 +78,49 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     // Decoding
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        decoder.channelRead(ctx, msg);
-        super.read(ctx);
-    }
-
-    private void decode(ChannelHandlerContext ctx, CorrelatedResponseItem<?> msg, List<Object> out) throws PlcProtocolException {
-        int correlationId = msg.getCorrelationId();
-        CorrelatedRequestItem<?> correlatedRequestItem = sentButUnacknowledgedRequestItems.remove(correlationId);
-        if (correlatedRequestItem == null) {
-            throw new PlcProtocolException("Unrelated package received " + msg);
+    private void tryFinish(int correlationId, InternalPlcResponse msg) {
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId);
+        if (plcRequestContainer == null) {
+            throw new PlcRuntimeException("Unrelated package received " + msg);
         }
-        PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer = correlatedRequestItem.getPlcRequestContainer();
-        List<CorrelatedResponseItem<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
+        List<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
         correlatedResponseItems.add(msg);
         Set<Integer> integers = containerCorrelationIdMap.get(plcRequestContainer);
         integers.remove(correlationId);
         if (integers.isEmpty()) {
-            PlcResponse<?, ?, ?> plcResponse;
-            if (plcRequestContainer.getRequest() instanceof TypeSafePlcReadRequest) {
-                TypeSafePlcReadRequest typeSafePlcReadRequest = (TypeSafePlcReadRequest) plcRequestContainer.getRequest();
-                plcResponse = new TypeSafePlcReadResponse((TypeSafePlcReadRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
-            } else if (plcRequestContainer.getRequest() instanceof TypeSafePlcWriteRequest) {
-                plcResponse = new TypeSafePlcWriteResponse((TypeSafePlcWriteRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
-            } else if (plcRequestContainer.getRequest() instanceof PlcReadRequest) {
-                plcResponse = new PlcReadResponse((PlcReadRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
-            } else if (plcRequestContainer.getRequest() instanceof PlcWriteRequest) {
-                plcResponse = new PlcWriteResponse((PlcWriteRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            InternalPlcResponse<?> plcResponse;
+            if (plcRequestContainer.getRequest() instanceof InternalPlcReadRequest) {
+                InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) plcRequestContainer.getRequest();
+                HashMap<String, Pair<PlcResponseCode, FieldItem>> fields = new HashMap<>();
+
+                correlatedResponseItems.stream()
+                    .map(InternalPlcReadResponse.class::cast)
+                    .map(InternalPlcReadResponse::getValues)
+                    .forEach(stringPairMap -> stringPairMap.forEach(fields::put));
+
+                plcResponse = new DefaultPlcReadResponse(internalPlcReadRequest, fields);
+            } else if (plcRequestContainer.getRequest() instanceof InternalPlcWriteRequest) {
+                InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) plcRequestContainer.getRequest();
+                HashMap<String, PlcResponseCode> values = new HashMap<>();
+
+                correlatedResponseItems.stream()
+                    .map(InternalPlcWriteResponse.class::cast)
+                    .map(InternalPlcWriteResponse::getValues)
+                    .forEach(stringPairMap -> stringPairMap.forEach(values::put));
+
+                plcResponse = new DefaultPlcWriteResponse(internalPlcWriteRequest, values);
             } else {
-                throw new PlcProtocolException("Unknown type detected " + plcRequestContainer.getRequest());
+                throw new PlcRuntimeException("Unknown type detected " + plcRequestContainer.getRequest());
             }
             plcRequestContainer.getResponseFuture().complete(plcResponse);
             responsesToBeDevliered.remove(plcRequestContainer);
         }
     }
 
+    private void errored(int correlationId, Throwable throwable) {
+
+    }
+
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
     // Encoding
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -134,22 +128,68 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     @Override
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
         if (msg instanceof PlcRequestContainer) {
-            PlcRequestContainer<?, PlcResponse<?, ?, ?>> in = (PlcRequestContainer<?, PlcResponse<?, ?, ?>>) msg;
+            @SuppressWarnings("unchecked")
+            PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>) msg;
             Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> new HashSet<>());
 
             // Create a promise that has to be called multiple times.
             PromiseCombiner promiseCombiner = new PromiseCombiner();
-            PlcRequest<?> request = in.getRequest();
-            for (RequestItem<?> item : request.getRequestItems()) {
-                ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+            InternalPlcRequest request = in.getRequest();
+            if (request instanceof InternalPlcFieldRequest) {
+                InternalPlcFieldRequest internalPlcFieldRequest = (InternalPlcFieldRequest) request;
+
+                if (internalPlcFieldRequest instanceof InternalPlcReadRequest) {
+                    InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) internalPlcFieldRequest;
+                    // TODO: repackage
+                    internalPlcReadRequest.getNamedFields().forEach(field -> {
+                        ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+
+                        int tdpu = correlationId.getAndIncrement();
+                        CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
+                            .thenApply(InternalPlcResponse.class::cast)
+                            .whenComplete((internalPlcResponse, throwable) -> {
+                                if (throwable != null) {
+                                    errored(tdpu, throwable);
+                                } else {
+                                    tryFinish(tdpu, internalPlcResponse);
+                                }
+                            });
+                        queue.add(new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture), subPromise);
+                        if (!tdpus.add(tdpu)) {
+                            throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                        }
+                        promiseCombiner.add((Future) subPromise);
+                    });
+                }
+                if (internalPlcFieldRequest instanceof InternalPlcWriteRequest) {
+                    InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) internalPlcFieldRequest;
+                    // TODO: repackage
+                    internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> {
+                        ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
 
-                int tdpu = correlationId.getAndIncrement();
-                queue.add(new CorrelatedRequestItem<>(tdpu, item, in), subPromise);
-                if (!tdpus.add(tdpu)) {
-                    throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                        int tdpu = correlationId.getAndIncrement();
+                        CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
+                            .thenApply(InternalPlcResponse.class::cast)
+                            .whenComplete((internalPlcResponse, throwable) -> {
+                                if (throwable != null) {
+                                    errored(tdpu, throwable);
+                                } else {
+                                    tryFinish(tdpu, internalPlcResponse);
+                                }
+                            });
+                        queue.add(new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture), subPromise);
+                        if (!tdpus.add(tdpu)) {
+                            throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                        }
+                        promiseCombiner.add((Future) subPromise);
+                    });
                 }
+            } else {
+                ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+                queue.add(msg, subPromise);
                 promiseCombiner.add((Future) subPromise);
             }
+
             promiseCombiner.finish(promise);
 
             // Start sending the queue content.
@@ -166,11 +206,9 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     protected synchronized void trySendingMessages(ChannelHandlerContext ctx) {
         while (queue.size() > 0) {
             // Get the RequestItem that is up next in the queue.
-            CorrelatedRequestItem<?> currentItem = (CorrelatedRequestItem) queue.current();
+            PlcRequestContainer currentItem = (PlcRequestContainer) queue.current();
+            InternalPlcRequest request = currentItem.getRequest();
 
-            if (currentItem == null) {
-                break;
-            }
             // Send the TPDU.
             try {
                 ChannelFuture channelFuture = queue.removeAndWrite();
@@ -183,11 +221,62 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                 ctx.fireExceptionCaught(e);
             }
 
-            // Add it to the list of sentButUnacknowledgedRequestItems.
-            sentButUnacknowledgedRequestItems.put(currentItem.getCorrelationId(), currentItem);
+            if (request instanceof CorrelatedPlcRequest) {
+                CorrelatedPlcRequest correlatedPlcRequest = (CorrelatedPlcRequest) request;
+
+                // Add it to the list of sentButUnacknowledgedRequestItems.
+                sentButUnacknowledgedRequestItems.put(correlatedPlcRequest.getTdpu(), currentItem);
 
-            LOGGER.debug("Item Message with id {} sent", currentItem.getCorrelationId());
+                LOGGER.debug("Item Message with id {} sent", correlatedPlcRequest.getTdpu());
+            }
         }
         ctx.flush();
     }
+
+    interface CorrelatedPlcRequest extends InternalPlcRequest {
+
+        int getTdpu();
+    }
+
+    private static class CorrelatedPlcReadRequest extends DefaultPlcReadRequest implements CorrelatedPlcRequest {
+
+        private final int tdpu;
+
+        public CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) {
+            super(fields);
+            this.tdpu = tdpu;
+        }
+
+        public static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
+            LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
+            fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue());
+            return new CorrelatedPlcReadRequest(fields, tdpu);
+        }
+
+        @Override
+        public int getTdpu() {
+            return tdpu;
+        }
+    }
+
+    private static class CorrelatedPlcWriteRequest extends DefaultPlcWriteRequest implements CorrelatedPlcRequest {
+
+        private final int tdpu;
+
+        public CorrelatedPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields, int tdpu) {
+            super(fields);
+            this.tdpu = tdpu;
+        }
+
+        public static CorrelatedPlcWriteRequest of(Triple<String, PlcField, FieldItem> fieldItemTriple, int tdpu) {
+            LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
+            fields.put(fieldItemTriple.getLeft(), Pair.of(fieldItemTriple.getMiddle(), fieldItemTriple.getRight()));
+            return new CorrelatedPlcWriteRequest(fields, tdpu);
+        }
+
+        @Override
+        public int getTdpu() {
+            return tdpu;
+        }
+    }
 }