You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/07/05 11:20:51 UTC

[incubator-plc4x] branch feature/TopLevelItemSpliting created (now 33ab0ab)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git.


      at 33ab0ab  Introduced protocol layer to split requestItems to several requests.

This branch includes the following new commits:

     new 369ee00  small fixes in the modbus protocol
     new 33ab0ab  Introduced protocol layer to split requestItems to several requests.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-plc4x] 02/02: Introduced protocol layer to split requestItems to several requests.

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 33ab0ab10e655c26bb2e38c71cb941b3db4fe5e1
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Jul 5 13:20:43 2018 +0200

    Introduced protocol layer to split requestItems to several requests.
---
 .../base/messages/item/CorrelatedRequestItem.java  |  81 +++++++++
 .../base/messages/item/CorrelatedResponseItem.java |  70 ++++++++
 .../SingleItemToSingleRequestProtocol.java         | 193 +++++++++++++++++++++
 3 files changed, 344 insertions(+)

diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
new file mode 100644
index 0000000..5212618
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
@@ -0,0 +1,81 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.base.messages.item;
+
+import org.apache.plc4x.java.api.messages.PlcRequestContainer;
+import org.apache.plc4x.java.api.messages.PlcResponse;
+import org.apache.plc4x.java.api.messages.items.RequestItem;
+
+import java.util.Objects;
+
+public class CorrelatedRequestItem<REQUEST_ITEM extends RequestItem<?>> {
+
+    private final int correlationId;
+
+    private final REQUEST_ITEM requestItem;
+
+    private final PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer;
+
+    public CorrelatedRequestItem(int correlationId, REQUEST_ITEM requestItem, PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer) {
+        this.correlationId = correlationId;
+        this.requestItem = requestItem;
+        this.plcRequestContainer = plcRequestContainer;
+    }
+
+    public int getCorrelationId() {
+        return correlationId;
+    }
+
+    public REQUEST_ITEM getRequestItem() {
+        return requestItem;
+    }
+
+    public PlcRequestContainer<?, PlcResponse<?, ?, ?>> getPlcRequestContainer() {
+        return plcRequestContainer;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof CorrelatedRequestItem)) {
+            return false;
+        }
+        CorrelatedRequestItem<?> that = (CorrelatedRequestItem<?>) o;
+        return correlationId == that.correlationId &&
+            Objects.equals(requestItem, that.requestItem) &&
+            Objects.equals(plcRequestContainer, that.plcRequestContainer);
+    }
+
+    @Override
+    public int hashCode() {
+
+        return Objects.hash(correlationId, requestItem, plcRequestContainer);
+    }
+
+    @Override
+    public String toString() {
+        return "CorrelatedRequestItem{" +
+            "correlationId=" + correlationId +
+            ", requestItem=" + requestItem +
+            ", plcRequestContainer=" + plcRequestContainer +
+            '}';
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
new file mode 100644
index 0000000..38a9032
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
@@ -0,0 +1,70 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+ 
+   http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.base.messages.item;
+
+import org.apache.plc4x.java.api.messages.items.ResponseItem;
+
+import java.util.Objects;
+
+public class CorrelatedResponseItem<RESPONSE_ITEM extends ResponseItem<?>> {
+
+    private final int correlationId;
+
+    private final RESPONSE_ITEM responseItem;
+
+    public CorrelatedResponseItem(int correlationId, RESPONSE_ITEM responseItem) {
+        this.correlationId = correlationId;
+        this.responseItem = responseItem;
+    }
+
+    public int getCorrelationId() {
+        return correlationId;
+    }
+
+    public RESPONSE_ITEM getResponseItem() {
+        return responseItem;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof CorrelatedResponseItem)) {
+            return false;
+        }
+        CorrelatedResponseItem<?> that = (CorrelatedResponseItem<?>) o;
+        return correlationId == that.correlationId &&
+            Objects.equals(responseItem, that.responseItem);
+    }
+
+    @Override
+    public int hashCode() {
+
+        return Objects.hash(correlationId, responseItem);
+    }
+
+    @Override
+    public String toString() {
+        return "CorrelatedResponseItem{" +
+            "correlationId=" + correlationId +
+            ", responseItem=" + responseItem +
+            '}';
+    }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
new file mode 100644
index 0000000..8c0e272
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -0,0 +1,193 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.base.protocol;
+
+import io.netty.channel.*;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.PromiseCombiner;
+import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.items.RequestItem;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
+import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse;
+import org.apache.plc4x.java.base.messages.item.CorrelatedRequestItem;
+import org.apache.plc4x.java.base.messages.item.CorrelatedResponseItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
+
+    public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
+
+    private PendingWriteQueue queue;
+
+    private ConcurrentMap<Integer, CorrelatedRequestItem<?>> sentButUnacknowledgedRequestItems;
+
+    private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap;
+
+    private ConcurrentMap<PlcRequestContainer<?, ?>, List<CorrelatedResponseItem<?>>> responsesToBeDevliered;
+
+    private AtomicInteger correlationId;
+
+    private final MessageToMessageDecoder<CorrelatedResponseItem> decoder = new MessageToMessageDecoder<CorrelatedResponseItem>() {
+
+        @Override
+        protected void decode(ChannelHandlerContext ctx, CorrelatedResponseItem msg, List<Object> out) throws Exception {
+            SingleItemToSingleRequestProtocol.this.decode(ctx, msg, out);
+        }
+    };
+
+    @Override
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+        this.queue = new PendingWriteQueue(ctx);
+        this.sentButUnacknowledgedRequestItems = new ConcurrentHashMap<>();
+        this.containerCorrelationIdMap = new ConcurrentHashMap<>();
+        this.correlationId = new AtomicInteger();
+        super.channelRegistered(ctx);
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+        this.queue.removeAndWriteAll();
+        super.channelUnregistered(ctx);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        // Send everything so we get a proper failure for those pending writes
+        this.queue.removeAndWriteAll();
+        super.channelInactive(ctx);
+    }
+
+    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // Decoding
+    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        decoder.channelRead(ctx, msg);
+        super.read(ctx);
+    }
+
+    private void decode(ChannelHandlerContext ctx, CorrelatedResponseItem<?> msg, List<Object> out) throws PlcProtocolException {
+        int correlationId = msg.getCorrelationId();
+        CorrelatedRequestItem<?> correlatedRequestItem = sentButUnacknowledgedRequestItems.remove(correlationId);
+        if (correlatedRequestItem == null) {
+            throw new PlcProtocolException("Unrelated package received " + msg);
+        }
+        PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer = correlatedRequestItem.getPlcRequestContainer();
+        List<CorrelatedResponseItem<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
+        correlatedResponseItems.add(msg);
+        Set<Integer> integers = containerCorrelationIdMap.get(plcRequestContainer);
+        integers.remove(correlationId);
+        if (integers.isEmpty()) {
+            PlcResponse<?, ?, ?> plcResponse;
+            if (plcRequestContainer.getRequest() instanceof TypeSafePlcReadRequest) {
+                TypeSafePlcReadRequest typeSafePlcReadRequest = (TypeSafePlcReadRequest) plcRequestContainer.getRequest();
+                plcResponse = new TypeSafePlcReadResponse((TypeSafePlcReadRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            } else if (plcRequestContainer.getRequest() instanceof TypeSafePlcWriteRequest) {
+                plcResponse = new TypeSafePlcWriteResponse((TypeSafePlcWriteRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            } else if (plcRequestContainer.getRequest() instanceof PlcReadRequest) {
+                plcResponse = new PlcReadResponse((PlcReadRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            } else if (plcRequestContainer.getRequest() instanceof PlcWriteRequest) {
+                plcResponse = new PlcWriteResponse((PlcWriteRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+            } else {
+                throw new PlcProtocolException("Unknown type detected " + plcRequestContainer.getRequest());
+            }
+            plcRequestContainer.getResponseFuture().complete(plcResponse);
+            responsesToBeDevliered.remove(plcRequestContainer);
+        }
+    }
+
+    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // Encoding
+    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+        if (msg instanceof PlcRequestContainer) {
+            PlcRequestContainer<?, PlcResponse<?, ?, ?>> in = (PlcRequestContainer<?, PlcResponse<?, ?, ?>>) msg;
+            Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> new HashSet<>());
+
+            // Create a promise that has to be called multiple times.
+            PromiseCombiner promiseCombiner = new PromiseCombiner();
+            PlcRequest<?> request = in.getRequest();
+            for (RequestItem<?> item : request.getRequestItems()) {
+                ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+
+                int tdpu = correlationId.getAndIncrement();
+                queue.add(new CorrelatedRequestItem<>(tdpu, item, in), subPromise);
+                if (!tdpus.add(tdpu)) {
+                    throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+                }
+                promiseCombiner.add((Future) subPromise);
+            }
+            promiseCombiner.finish(promise);
+
+            // Start sending the queue content.
+            trySendingMessages(ctx);
+        } else {
+            super.write(ctx, msg, promise);
+        }
+    }
+
+    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // Helpers
+    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    protected synchronized void trySendingMessages(ChannelHandlerContext ctx) {
+        while (queue.size() > 0) {
+            // Get the RequestItem that is up next in the queue.
+            CorrelatedRequestItem<?> currentItem = (CorrelatedRequestItem) queue.current();
+
+            if (currentItem == null) {
+                break;
+            }
+            // Send the TPDU.
+            try {
+                ChannelFuture channelFuture = queue.removeAndWrite();
+                ctx.flush();
+                if (channelFuture == null) {
+                    break;
+                }
+            } catch (Exception e) {
+                LOGGER.error("Error sending more queues messages", e);
+                ctx.fireExceptionCaught(e);
+            }
+
+            // Add it to the list of sentButUnacknowledgedRequestItems.
+            sentButUnacknowledgedRequestItems.put(currentItem.getCorrelationId(), currentItem);
+
+            LOGGER.debug("Item Message with id {} sent", currentItem.getCorrelationId());
+        }
+        ctx.flush();
+    }
+}


[incubator-plc4x] 01/02: small fixes in the modbus protocol

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 369ee00c4be28c59d0ad785ae4dbe208bc63d3ab
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Jul 5 10:02:12 2018 +0200

    small fixes in the modbus protocol
---
 .../org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocol.java   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocol.java b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocol.java
index 8cbce82..288cda7 100644
--- a/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocol.java
+++ b/plc4j/protocols/modbus/src/main/java/org/apache/plc4x/java/modbus/netty/Plc4XModbusProtocol.java
@@ -70,7 +70,7 @@ public class Plc4XModbusProtocol extends MessageToMessageCodec<ModbusTcpPayload,
     private void encodeWriteRequest(PlcRequestContainer<PlcRequest, PlcResponse> msg, List<Object> out) throws PlcException {
         PlcWriteRequest request = (PlcWriteRequest) msg.getRequest();
         // TODO: support multiple requests
-        WriteRequestItem<?> writeRequestItem = request.getRequestItem().get();
+        WriteRequestItem<?> writeRequestItem = request.getRequestItem().orElseThrow(() -> new PlcProtocolException("Only single message supported for now"));
         // TODO: check if we can map like this. Implication is that we can only work with int, short, byte and boolean
         // TODO: for higher datatypes float, double etc we might need to split the bytes into chunks
         int quantity = writeRequestItem.getSize();
@@ -110,7 +110,7 @@ public class Plc4XModbusProtocol extends MessageToMessageCodec<ModbusTcpPayload,
     private void encodeReadRequest(PlcRequestContainer<PlcRequest, PlcResponse> msg, List<Object> out) throws PlcException {
         PlcReadRequest request = (PlcReadRequest) msg.getRequest();
         // TODO: support multiple requests
-        ReadRequestItem<?> readRequestItem = request.getRequestItem().get();
+        ReadRequestItem<?> readRequestItem = request.getRequestItem().orElseThrow(() -> new PlcProtocolException("Only single message supported for now"));
         // TODO: check if we can map like this. Implication is that we can only work with int, short, byte and boolean
         // TODO: for higher datatypes float, double etc we might need to split the bytes into chunks
         int quantity = readRequestItem.getSize();
@@ -155,8 +155,8 @@ public class Plc4XModbusProtocol extends MessageToMessageCodec<ModbusTcpPayload,
         }
 
         // TODO: only single Item supported for now
-        PlcRequest request = plcRequestContainer.getRequest();
-        RequestItem requestItem = (RequestItem) request.getRequestItem().get();
+        PlcRequest<?> request = plcRequestContainer.getRequest();
+        RequestItem requestItem = request.getRequestItem().orElseThrow(() -> new PlcProtocolException("Only single message supported for now"));
         Class datatype = requestItem.getDatatype();
 
         ModbusPdu modbusPdu = msg.getModbusPdu();