You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/09/27 10:15:53 UTC

[incubator-plc4x] 02/07: [General] some progress on the SingleItemToSingleRequestProtocol

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit a53cbbf8e022409ef4c3aaa79c93b029e39375bf
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Sep 26 18:01:46 2018 +0200

    [General] some progress on the SingleItemToSingleRequestProtocol
---
 .../base/messages/item/CorrelatedRequestItem.java  |  81 --------
 .../base/messages/item/CorrelatedResponseItem.java |  70 -------
 .../SingleItemToSingleRequestProtocol.java         | 217 +++++++++++++++------
 3 files changed, 153 insertions(+), 215 deletions(-)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
deleted file mode 100644
index 5212618..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.base.messages.item;
-
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
-import org.apache.plc4x.java.api.messages.PlcResponse;
-import org.apache.plc4x.java.api.messages.items.RequestItem;
-
-import java.util.Objects;
-
-public class CorrelatedRequestItem<REQUEST_ITEM extends RequestItem<?>> {
-
-    private final int correlationId;
-
-    private final REQUEST_ITEM requestItem;
-
-    private final PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer;
-
-    public CorrelatedRequestItem(int correlationId, REQUEST_ITEM requestItem, PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer) {
-        this.correlationId = correlationId;
-        this.requestItem = requestItem;
-        this.plcRequestContainer = plcRequestContainer;
-    }
-
-    public int getCorrelationId() {
-        return correlationId;
-    }
-
-    public REQUEST_ITEM getRequestItem() {
-        return requestItem;
-    }
-
-    public PlcRequestContainer<?, PlcResponse<?, ?, ?>> getPlcRequestContainer() {
-        return plcRequestContainer;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof CorrelatedRequestItem)) {
-            return false;
-        }
-        CorrelatedRequestItem<?> that = (CorrelatedRequestItem<?>) o;
-        return correlationId == that.correlationId &&
-            Objects.equals(requestItem, that.requestItem) &&
-            Objects.equals(plcRequestContainer, that.plcRequestContainer);
-    }
-
-    @Override
-    public int hashCode() {
-
-        return Objects.hash(correlationId, requestItem, plcRequestContainer);
-    }
-
-    @Override
-    public String toString() {
-        return "CorrelatedRequestItem{" +
-            "correlationId=" + correlationId +
-            ", requestItem=" + requestItem +
-            ", plcRequestContainer=" + plcRequestContainer +
-            '}';
-    }
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
deleted file mode 100644
index 38a9032..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
- 
-   http://www.apache.org/licenses/LICENSE-2.0
- 
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.base.messages.item;
-
-import org.apache.plc4x.java.api.messages.items.ResponseItem;
-
-import java.util.Objects;
-
-public class CorrelatedResponseItem<RESPONSE_ITEM extends ResponseItem<?>> {
-
-    private final int correlationId;
-
-    private final RESPONSE_ITEM responseItem;
-
-    public CorrelatedResponseItem(int correlationId, RESPONSE_ITEM responseItem) {
-        this.correlationId = correlationId;
-        this.responseItem = responseItem;
-    }
-
-    public int getCorrelationId() {
-        return correlationId;
-    }
-
-    public RESPONSE_ITEM getResponseItem() {
-        return responseItem;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof CorrelatedResponseItem)) {
-            return false;
-        }
-        CorrelatedResponseItem<?> that = (CorrelatedResponseItem<?>) o;
-        return correlationId == that.correlationId &&
-            Objects.equals(responseItem, that.responseItem);
-    }
-
-    @Override
-    public int hashCode() {
-
-        return Objects.hash(correlationId, responseItem);
-    }
-
-    @Override
-    public String toString() {
-        return "CorrelatedResponseItem{" +
-            "correlationId=" + correlationId +
-            ", responseItem=" + responseItem +
-            '}';
-    }
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 8c0e272..267f9a8 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -19,52 +19,39 @@
 package org.apache.plc4x.java.base.protocol;
 
 import io.netty.channel.*;
-import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.PromiseCombiner;
-import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
-import org.apache.plc4x.java.api.messages.*;
-import org.apache.plc4x.java.api.messages.items.RequestItem;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse;
-import org.apache.plc4x.java.base.messages.item.CorrelatedRequestItem;
-import org.apache.plc4x.java.base.messages.item.CorrelatedResponseItem;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.*;
+import org.apache.plc4x.java.base.messages.items.FieldItem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
+// TODO: write test
 public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
 
     private PendingWriteQueue queue;
 
-    private ConcurrentMap<Integer, CorrelatedRequestItem<?>> sentButUnacknowledgedRequestItems;
+    private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedRequestItems;
 
     private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap;
 
-    private ConcurrentMap<PlcRequestContainer<?, ?>, List<CorrelatedResponseItem<?>>> responsesToBeDevliered;
+    private ConcurrentMap<PlcRequestContainer<?, ?>, List<InternalPlcResponse<?>>> responsesToBeDevliered;
 
     private AtomicInteger correlationId;
 
-    private final MessageToMessageDecoder<CorrelatedResponseItem> decoder = new MessageToMessageDecoder<CorrelatedResponseItem>() {
-
-        @Override
-        protected void decode(ChannelHandlerContext ctx, CorrelatedResponseItem msg, List<Object> out) throws Exception {
-            SingleItemToSingleRequestProtocol.this.decode(ctx, msg, out);
-        }
-    };
-
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
         this.queue = new PendingWriteQueue(ctx);
@@ -91,42 +78,49 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     // Decoding
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        decoder.channelRead(ctx, msg);
-        super.read(ctx);
-    }
-
-    private void decode(ChannelHandlerContext ctx, CorrelatedResponseItem<?> msg, List<Object> out) throws PlcProtocolException {
-        int correlationId = msg.getCorrelationId();
-        CorrelatedRequestItem<?> correlatedRequestItem = sentButUnacknowledgedRequestItems.remove(correlationId);
-        if (correlatedRequestItem == null) {
-            throw new PlcProtocolException("Unrelated package received " + msg);
+    private void tryFinish(int correlationId, InternalPlcResponse msg) {
+        PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId);
+        if (plcRequestContainer == null) {
+            throw new PlcRuntimeException("Unrelated package received " + msg);
         }
-        PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer = correlatedRequestItem.getPlcRequestContainer();
-        List<CorrelatedResponseItem<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
+        List<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
         correlatedResponseItems.add(msg);
         Set<Integer> integers = containerCorrelationIdMap.get(plcRequestContainer);
         integers.remove(correlationId);
         if (integers.isEmpty()) {
-            PlcResponse<?, ?, ?> plcResponse;
-            if (plcRequestContainer.getRequest() instanceof TypeSafePlcReadRequest) {
-                TypeSafePlcReadRequest typeSafePlcReadRequest = (TypeSafePlcReadRequest) plcRequestContainer.getRequest();
-                plcResponse = new TypeSafePlcReadResponse((TypeSafePlcReadRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
-            } else if (plcRequestContainer.getRequest() instanceof TypeSafePlcWriteRequest) {
-                plcResponse = new TypeSafePlcWriteResponse((TypeSafePlcWriteRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
-            } else if (plcRequestContainer.getRequest() instanceof PlcReadRequest) {
-                plcResponse = new PlcReadResponse((PlcReadRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
-            } else if (plcRequestContainer.getRequest() instanceof PlcWriteRequest) {
-                plcResponse = new PlcWriteResponse((PlcWriteRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            InternalPlcResponse<?> plcResponse;
+            if (plcRequestContainer.getRequest() instanceof InternalPlcReadRequest) {
+                InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) plcRequestContainer.getRequest();
+                HashMap<String, Pair<PlcResponseCode, FieldItem>> fields = new HashMap<>();
+
+                correlatedResponseItems.stream()
+                    .map(InternalPlcReadResponse.class::cast)
+                    .map(InternalPlcReadResponse::getValues)
+                    .forEach(stringPairMap -> stringPairMap.forEach(fields::put));
+
+                plcResponse = new DefaultPlcReadResponse(internalPlcReadRequest, fields);
+            } else if (plcRequestContainer.getRequest() instanceof InternalPlcWriteRequest) {
+                InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) plcRequestContainer.getRequest();
+                HashMap<String, PlcResponseCode> values = new HashMap<>();
+
+                correlatedResponseItems.stream()
+                    .map(InternalPlcWriteResponse.class::cast)
+                    .map(InternalPlcWriteResponse::getValues)
+                    .forEach(stringPairMap -> stringPairMap.forEach(values::put));
+
+                plcResponse = new DefaultPlcWriteResponse(internalPlcWriteRequest, values);
             } else {
-                throw new PlcProtocolException("Unknown type detected " + plcRequestContainer.getRequest());
+                throw new PlcRuntimeException("Unknown type detected " + plcRequestContainer.getRequest());
             }
             plcRequestContainer.getResponseFuture().complete(plcResponse);
             responsesToBeDevliered.remove(plcRequestContainer);
         }
     }
 
+    private void errored(int correlationId, Throwable throwable) {
+
+    }
+
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
     // Encoding
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -134,22 +128,68 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     @Override
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
         if (msg instanceof PlcRequestContainer) {
-            PlcRequestContainer<?, PlcResponse<?, ?, ?>> in = (PlcRequestContainer<?, PlcResponse<?, ?, ?>>) msg;
+            @SuppressWarnings("unchecked")
+            PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>) msg;
             Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> new HashSet<>());
 
             // Create a promise that has to be called multiple times.
             PromiseCombiner promiseCombiner = new PromiseCombiner();
-            PlcRequest<?> request = in.getRequest();
-            for (RequestItem<?> item : request.getRequestItems()) {
-                ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+            InternalPlcRequest request = in.getRequest();
+            if (request instanceof InternalPlcFieldRequest) {
+                InternalPlcFieldRequest internalPlcFieldRequest = (InternalPlcFieldRequest) request;
+
+                if (internalPlcFieldRequest instanceof InternalPlcReadRequest) {
+                    InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) internalPlcFieldRequest;
+                    // TODO: repackage
+                    internalPlcReadRequest.getNamedFields().forEach(field -> {
+                        ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+
+                        int tdpu = correlationId.getAndIncrement();
+                        CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
+                            .thenApply(InternalPlcResponse.class::cast)
+                            .whenComplete((internalPlcResponse, throwable) -> {
+                                if (throwable != null) {
+                                    errored(tdpu, throwable);
+                                } else {
+                                    tryFinish(tdpu, internalPlcResponse);
+                                }
+                            });
+                        queue.add(new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture), subPromise);
+                        if (!tdpus.add(tdpu)) {
+                            throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                        }
+                        promiseCombiner.add((Future) subPromise);
+                    });
+                }
+                if (internalPlcFieldRequest instanceof InternalPlcWriteRequest) {
+                    InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) internalPlcFieldRequest;
+                    // TODO: repackage
+                    internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> {
+                        ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
 
-                int tdpu = correlationId.getAndIncrement();
-                queue.add(new CorrelatedRequestItem<>(tdpu, item, in), subPromise);
-                if (!tdpus.add(tdpu)) {
-                    throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                        int tdpu = correlationId.getAndIncrement();
+                        CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
+                            .thenApply(InternalPlcResponse.class::cast)
+                            .whenComplete((internalPlcResponse, throwable) -> {
+                                if (throwable != null) {
+                                    errored(tdpu, throwable);
+                                } else {
+                                    tryFinish(tdpu, internalPlcResponse);
+                                }
+                            });
+                        queue.add(new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture), subPromise);
+                        if (!tdpus.add(tdpu)) {
+                            throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                        }
+                        promiseCombiner.add((Future) subPromise);
+                    });
                 }
+            } else {
+                ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+                queue.add(msg, subPromise);
                 promiseCombiner.add((Future) subPromise);
             }
+
             promiseCombiner.finish(promise);
 
             // Start sending the queue content.
@@ -166,11 +206,9 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
     protected synchronized void trySendingMessages(ChannelHandlerContext ctx) {
         while (queue.size() > 0) {
             // Get the RequestItem that is up next in the queue.
-            CorrelatedRequestItem<?> currentItem = (CorrelatedRequestItem) queue.current();
+            PlcRequestContainer currentItem = (PlcRequestContainer) queue.current();
+            InternalPlcRequest request = currentItem.getRequest();
 
-            if (currentItem == null) {
-                break;
-            }
             // Send the TPDU.
             try {
                 ChannelFuture channelFuture = queue.removeAndWrite();
@@ -183,11 +221,62 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
                 ctx.fireExceptionCaught(e);
             }
 
-            // Add it to the list of sentButUnacknowledgedRequestItems.
-            sentButUnacknowledgedRequestItems.put(currentItem.getCorrelationId(), currentItem);
+            if (request instanceof CorrelatedPlcRequest) {
+                CorrelatedPlcRequest correlatedPlcRequest = (CorrelatedPlcRequest) request;
+
+                // Add it to the list of sentButUnacknowledgedRequestItems.
+                sentButUnacknowledgedRequestItems.put(correlatedPlcRequest.getTdpu(), currentItem);
 
-            LOGGER.debug("Item Message with id {} sent", currentItem.getCorrelationId());
+                LOGGER.debug("Item Message with id {} sent", correlatedPlcRequest.getTdpu());
+            }
         }
         ctx.flush();
     }
+
+    interface CorrelatedPlcRequest extends InternalPlcRequest {
+
+        int getTdpu();
+    }
+
+    private static class CorrelatedPlcReadRequest extends DefaultPlcReadRequest implements CorrelatedPlcRequest {
+
+        private final int tdpu;
+
+        public CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) {
+            super(fields);
+            this.tdpu = tdpu;
+        }
+
+        public static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
+            LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
+            fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue());
+            return new CorrelatedPlcReadRequest(fields, tdpu);
+        }
+
+        @Override
+        public int getTdpu() {
+            return tdpu;
+        }
+    }
+
+    private static class CorrelatedPlcWriteRequest extends DefaultPlcWriteRequest implements CorrelatedPlcRequest {
+
+        private final int tdpu;
+
+        public CorrelatedPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields, int tdpu) {
+            super(fields);
+            this.tdpu = tdpu;
+        }
+
+        public static CorrelatedPlcWriteRequest of(Triple<String, PlcField, FieldItem> fieldItemTriple, int tdpu) {
+            LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
+            fields.put(fieldItemTriple.getLeft(), Pair.of(fieldItemTriple.getMiddle(), fieldItemTriple.getRight()));
+            return new CorrelatedPlcWriteRequest(fields, tdpu);
+        }
+
+        @Override
+        public int getTdpu() {
+            return tdpu;
+        }
+    }
 }