You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/09/27 10:15:53 UTC
[incubator-plc4x] 02/07: [General] some progress on the
SingleItemToSingleRequestProtocol
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit a53cbbf8e022409ef4c3aaa79c93b029e39375bf
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Sep 26 18:01:46 2018 +0200
[General] some progress on the SingleItemToSingleRequestProtocol
---
.../base/messages/item/CorrelatedRequestItem.java | 81 --------
.../base/messages/item/CorrelatedResponseItem.java | 70 -------
.../SingleItemToSingleRequestProtocol.java | 217 +++++++++++++++------
3 files changed, 153 insertions(+), 215 deletions(-)
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
deleted file mode 100644
index 5212618..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.base.messages.item;
-
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
-import org.apache.plc4x.java.api.messages.PlcResponse;
-import org.apache.plc4x.java.api.messages.items.RequestItem;
-
-import java.util.Objects;
-
-public class CorrelatedRequestItem<REQUEST_ITEM extends RequestItem<?>> {
-
- private final int correlationId;
-
- private final REQUEST_ITEM requestItem;
-
- private final PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer;
-
- public CorrelatedRequestItem(int correlationId, REQUEST_ITEM requestItem, PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer) {
- this.correlationId = correlationId;
- this.requestItem = requestItem;
- this.plcRequestContainer = plcRequestContainer;
- }
-
- public int getCorrelationId() {
- return correlationId;
- }
-
- public REQUEST_ITEM getRequestItem() {
- return requestItem;
- }
-
- public PlcRequestContainer<?, PlcResponse<?, ?, ?>> getPlcRequestContainer() {
- return plcRequestContainer;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof CorrelatedRequestItem)) {
- return false;
- }
- CorrelatedRequestItem<?> that = (CorrelatedRequestItem<?>) o;
- return correlationId == that.correlationId &&
- Objects.equals(requestItem, that.requestItem) &&
- Objects.equals(plcRequestContainer, that.plcRequestContainer);
- }
-
- @Override
- public int hashCode() {
-
- return Objects.hash(correlationId, requestItem, plcRequestContainer);
- }
-
- @Override
- public String toString() {
- return "CorrelatedRequestItem{" +
- "correlationId=" + correlationId +
- ", requestItem=" + requestItem +
- ", plcRequestContainer=" + plcRequestContainer +
- '}';
- }
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
deleted file mode 100644
index 38a9032..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.base.messages.item;
-
-import org.apache.plc4x.java.api.messages.items.ResponseItem;
-
-import java.util.Objects;
-
-public class CorrelatedResponseItem<RESPONSE_ITEM extends ResponseItem<?>> {
-
- private final int correlationId;
-
- private final RESPONSE_ITEM responseItem;
-
- public CorrelatedResponseItem(int correlationId, RESPONSE_ITEM responseItem) {
- this.correlationId = correlationId;
- this.responseItem = responseItem;
- }
-
- public int getCorrelationId() {
- return correlationId;
- }
-
- public RESPONSE_ITEM getResponseItem() {
- return responseItem;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof CorrelatedResponseItem)) {
- return false;
- }
- CorrelatedResponseItem<?> that = (CorrelatedResponseItem<?>) o;
- return correlationId == that.correlationId &&
- Objects.equals(responseItem, that.responseItem);
- }
-
- @Override
- public int hashCode() {
-
- return Objects.hash(correlationId, responseItem);
- }
-
- @Override
- public String toString() {
- return "CorrelatedResponseItem{" +
- "correlationId=" + correlationId +
- ", responseItem=" + responseItem +
- '}';
- }
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 8c0e272..267f9a8 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -19,52 +19,39 @@
package org.apache.plc4x.java.base.protocol;
import io.netty.channel.*;
-import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.PromiseCombiner;
-import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
-import org.apache.plc4x.java.api.messages.*;
-import org.apache.plc4x.java.api.messages.items.RequestItem;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse;
-import org.apache.plc4x.java.base.messages.item.CorrelatedRequestItem;
-import org.apache.plc4x.java.base.messages.item.CorrelatedResponseItem;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.*;
+import org.apache.plc4x.java.base.messages.items.FieldItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
+// TODO: write test
public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
private PendingWriteQueue queue;
- private ConcurrentMap<Integer, CorrelatedRequestItem<?>> sentButUnacknowledgedRequestItems;
+ private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedRequestItems;
private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap;
- private ConcurrentMap<PlcRequestContainer<?, ?>, List<CorrelatedResponseItem<?>>> responsesToBeDevliered;
+ private ConcurrentMap<PlcRequestContainer<?, ?>, List<InternalPlcResponse<?>>> responsesToBeDevliered;
private AtomicInteger correlationId;
- private final MessageToMessageDecoder<CorrelatedResponseItem> decoder = new MessageToMessageDecoder<CorrelatedResponseItem>() {
-
- @Override
- protected void decode(ChannelHandlerContext ctx, CorrelatedResponseItem msg, List<Object> out) throws Exception {
- SingleItemToSingleRequestProtocol.this.decode(ctx, msg, out);
- }
- };
-
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
this.queue = new PendingWriteQueue(ctx);
@@ -91,42 +78,49 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
// Decoding
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- decoder.channelRead(ctx, msg);
- super.read(ctx);
- }
-
- private void decode(ChannelHandlerContext ctx, CorrelatedResponseItem<?> msg, List<Object> out) throws PlcProtocolException {
- int correlationId = msg.getCorrelationId();
- CorrelatedRequestItem<?> correlatedRequestItem = sentButUnacknowledgedRequestItems.remove(correlationId);
- if (correlatedRequestItem == null) {
- throw new PlcProtocolException("Unrelated package received " + msg);
+ private void tryFinish(int correlationId, InternalPlcResponse msg) {
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId);
+ if (plcRequestContainer == null) {
+ throw new PlcRuntimeException("Unrelated package received " + msg);
}
- PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer = correlatedRequestItem.getPlcRequestContainer();
- List<CorrelatedResponseItem<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
+ List<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
correlatedResponseItems.add(msg);
Set<Integer> integers = containerCorrelationIdMap.get(plcRequestContainer);
integers.remove(correlationId);
if (integers.isEmpty()) {
- PlcResponse<?, ?, ?> plcResponse;
- if (plcRequestContainer.getRequest() instanceof TypeSafePlcReadRequest) {
- TypeSafePlcReadRequest typeSafePlcReadRequest = (TypeSafePlcReadRequest) plcRequestContainer.getRequest();
- plcResponse = new TypeSafePlcReadResponse((TypeSafePlcReadRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
- } else if (plcRequestContainer.getRequest() instanceof TypeSafePlcWriteRequest) {
- plcResponse = new TypeSafePlcWriteResponse((TypeSafePlcWriteRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
- } else if (plcRequestContainer.getRequest() instanceof PlcReadRequest) {
- plcResponse = new PlcReadResponse((PlcReadRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
- } else if (plcRequestContainer.getRequest() instanceof PlcWriteRequest) {
- plcResponse = new PlcWriteResponse((PlcWriteRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+ InternalPlcResponse<?> plcResponse;
+ if (plcRequestContainer.getRequest() instanceof InternalPlcReadRequest) {
+ InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) plcRequestContainer.getRequest();
+ HashMap<String, Pair<PlcResponseCode, FieldItem>> fields = new HashMap<>();
+
+ correlatedResponseItems.stream()
+ .map(InternalPlcReadResponse.class::cast)
+ .map(InternalPlcReadResponse::getValues)
+ .forEach(stringPairMap -> stringPairMap.forEach(fields::put));
+
+ plcResponse = new DefaultPlcReadResponse(internalPlcReadRequest, fields);
+ } else if (plcRequestContainer.getRequest() instanceof InternalPlcWriteRequest) {
+ InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) plcRequestContainer.getRequest();
+ HashMap<String, PlcResponseCode> values = new HashMap<>();
+
+ correlatedResponseItems.stream()
+ .map(InternalPlcWriteResponse.class::cast)
+ .map(InternalPlcWriteResponse::getValues)
+ .forEach(stringPairMap -> stringPairMap.forEach(values::put));
+
+ plcResponse = new DefaultPlcWriteResponse(internalPlcWriteRequest, values);
} else {
- throw new PlcProtocolException("Unknown type detected " + plcRequestContainer.getRequest());
+ throw new PlcRuntimeException("Unknown type detected " + plcRequestContainer.getRequest());
}
plcRequestContainer.getResponseFuture().complete(plcResponse);
responsesToBeDevliered.remove(plcRequestContainer);
}
}
+ private void errored(int correlationId, Throwable throwable) {
+
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Encoding
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -134,22 +128,68 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof PlcRequestContainer) {
- PlcRequestContainer<?, PlcResponse<?, ?, ?>> in = (PlcRequestContainer<?, PlcResponse<?, ?, ?>>) msg;
+ @SuppressWarnings("unchecked")
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>) msg;
Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> new HashSet<>());
// Create a promise that has to be called multiple times.
PromiseCombiner promiseCombiner = new PromiseCombiner();
- PlcRequest<?> request = in.getRequest();
- for (RequestItem<?> item : request.getRequestItems()) {
- ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+ InternalPlcRequest request = in.getRequest();
+ if (request instanceof InternalPlcFieldRequest) {
+ InternalPlcFieldRequest internalPlcFieldRequest = (InternalPlcFieldRequest) request;
+
+ if (internalPlcFieldRequest instanceof InternalPlcReadRequest) {
+ InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) internalPlcFieldRequest;
+ // TODO: repackage
+ internalPlcReadRequest.getNamedFields().forEach(field -> {
+ ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+
+ int tdpu = correlationId.getAndIncrement();
+ CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
+ .thenApply(InternalPlcResponse.class::cast)
+ .whenComplete((internalPlcResponse, throwable) -> {
+ if (throwable != null) {
+ errored(tdpu, throwable);
+ } else {
+ tryFinish(tdpu, internalPlcResponse);
+ }
+ });
+ queue.add(new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture), subPromise);
+ if (!tdpus.add(tdpu)) {
+ throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+ }
+ promiseCombiner.add((Future) subPromise);
+ });
+ }
+ if (internalPlcFieldRequest instanceof InternalPlcWriteRequest) {
+ InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) internalPlcFieldRequest;
+ // TODO: repackage
+ internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> {
+ ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
- int tdpu = correlationId.getAndIncrement();
- queue.add(new CorrelatedRequestItem<>(tdpu, item, in), subPromise);
- if (!tdpus.add(tdpu)) {
- throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+ int tdpu = correlationId.getAndIncrement();
+ CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
+ .thenApply(InternalPlcResponse.class::cast)
+ .whenComplete((internalPlcResponse, throwable) -> {
+ if (throwable != null) {
+ errored(tdpu, throwable);
+ } else {
+ tryFinish(tdpu, internalPlcResponse);
+ }
+ });
+ queue.add(new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture), subPromise);
+ if (!tdpus.add(tdpu)) {
+ throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+ }
+ promiseCombiner.add((Future) subPromise);
+ });
}
+ } else {
+ ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+ queue.add(msg, subPromise);
promiseCombiner.add((Future) subPromise);
}
+
promiseCombiner.finish(promise);
// Start sending the queue content.
@@ -166,11 +206,9 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
protected synchronized void trySendingMessages(ChannelHandlerContext ctx) {
while (queue.size() > 0) {
// Get the RequestItem that is up next in the queue.
- CorrelatedRequestItem<?> currentItem = (CorrelatedRequestItem) queue.current();
+ PlcRequestContainer currentItem = (PlcRequestContainer) queue.current();
+ InternalPlcRequest request = currentItem.getRequest();
- if (currentItem == null) {
- break;
- }
// Send the TPDU.
try {
ChannelFuture channelFuture = queue.removeAndWrite();
@@ -183,11 +221,62 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
ctx.fireExceptionCaught(e);
}
- // Add it to the list of sentButUnacknowledgedRequestItems.
- sentButUnacknowledgedRequestItems.put(currentItem.getCorrelationId(), currentItem);
+ if (request instanceof CorrelatedPlcRequest) {
+ CorrelatedPlcRequest correlatedPlcRequest = (CorrelatedPlcRequest) request;
+
+ // Add it to the list of sentButUnacknowledgedRequestItems.
+ sentButUnacknowledgedRequestItems.put(correlatedPlcRequest.getTdpu(), currentItem);
- LOGGER.debug("Item Message with id {} sent", currentItem.getCorrelationId());
+ LOGGER.debug("Item Message with id {} sent", correlatedPlcRequest.getTdpu());
+ }
}
ctx.flush();
}
+
+ interface CorrelatedPlcRequest extends InternalPlcRequest {
+
+ int getTdpu();
+ }
+
+ private static class CorrelatedPlcReadRequest extends DefaultPlcReadRequest implements CorrelatedPlcRequest {
+
+ private final int tdpu;
+
+ public CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) {
+ super(fields);
+ this.tdpu = tdpu;
+ }
+
+ public static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
+ LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
+ fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue());
+ return new CorrelatedPlcReadRequest(fields, tdpu);
+ }
+
+ @Override
+ public int getTdpu() {
+ return tdpu;
+ }
+ }
+
+ private static class CorrelatedPlcWriteRequest extends DefaultPlcWriteRequest implements CorrelatedPlcRequest {
+
+ private final int tdpu;
+
+ public CorrelatedPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields, int tdpu) {
+ super(fields);
+ this.tdpu = tdpu;
+ }
+
+ public static CorrelatedPlcWriteRequest of(Triple<String, PlcField, FieldItem> fieldItemTriple, int tdpu) {
+ LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
+ fields.put(fieldItemTriple.getLeft(), Pair.of(fieldItemTriple.getMiddle(), fieldItemTriple.getRight()));
+ return new CorrelatedPlcWriteRequest(fields, tdpu);
+ }
+
+ @Override
+ public int getTdpu() {
+ return tdpu;
+ }
+ }
}