You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/09/26 16:11:20 UTC
[incubator-plc4x] 02/03: [General] some progress on the
SingleItemToSingleRequestProtocol
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 31e2acba2d748ae7d5d9197be9057b707f943151
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Sep 26 18:01:46 2018 +0200
[General] some progress on the SingleItemToSingleRequestProtocol
---
.../java/base/messages/DefaultPlcReadRequest.java | 12 +-
.../java/base/messages/DefaultPlcReadResponse.java | 5 +
.../messages/DefaultPlcSubscriptionRequest.java | 15 +-
.../messages/DefaultPlcUnsubscriptionRequest.java | 6 +
.../java/base/messages/DefaultPlcWriteRequest.java | 28 ++-
.../base/messages/DefaultPlcWriteResponse.java | 5 +
.../base/messages/InternalPlcFieldRequest.java | 6 +
.../java/base/messages/InternalPlcReadRequest.java | 2 +-
.../base/messages/InternalPlcReadResponse.java | 6 +
.../base/messages/InternalPlcWriteRequest.java | 4 +
.../base/messages/InternalPlcWriteResponse.java | 5 +-
.../base/messages/item/CorrelatedRequestItem.java | 81 --------
.../base/messages/item/CorrelatedResponseItem.java | 70 -------
.../SingleItemToSingleRequestProtocol.java | 217 +++++++++++++++------
14 files changed, 238 insertions(+), 224 deletions(-)
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
index ef9d7ba..9b60fe4 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
@@ -18,18 +18,20 @@ under the License.
*/
package org.apache.plc4x.java.base.messages;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.base.connection.PlcFieldHandler;
import java.util.*;
+import java.util.stream.Collectors;
public class DefaultPlcReadRequest implements InternalPlcReadRequest, InternalPlcFieldRequest {
private LinkedHashMap<String, PlcField> fields;
- private DefaultPlcReadRequest(LinkedHashMap<String, PlcField> fields) {
+ protected DefaultPlcReadRequest(LinkedHashMap<String, PlcField> fields) {
this.fields = fields;
}
@@ -54,6 +56,14 @@ public class DefaultPlcReadRequest implements InternalPlcReadRequest, InternalPl
return new LinkedList<>(fields.values());
}
+ @Override
+ public LinkedList<Pair<String, PlcField>> getNamedFields() {
+ return fields.entrySet()
+ .stream()
+ .map(stringPlcFieldEntry -> Pair.of(stringPlcFieldEntry.getKey(), stringPlcFieldEntry.getValue()))
+ .collect(Collectors.toCollection(LinkedList::new));
+ }
+
public static class Builder implements PlcReadRequest.Builder {
private final PlcFieldHandler fieldHandler;
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadResponse.java
index cd2dc47..73cad97 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadResponse.java
@@ -76,6 +76,11 @@ public class DefaultPlcReadResponse implements InternalPlcReadResponse {
}
@Override
+ public Map<String, Pair<PlcResponseCode, FieldItem>> getValues() {
+ return values;
+ }
+
+ @Override
public Object getObject(String name) {
return getObject(name, 0);
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
index f50f0ad..9f59cb3 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
@@ -34,27 +34,32 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
@Override
public int getNumberOfFields() {
- return 0;
+ throw new IllegalStateException("not available");
}
@Override
public LinkedHashSet<String> getFieldNames() {
- return null;
+ throw new IllegalStateException("not available");
}
@Override
public PlcField getField(String name) {
- return null;
+ throw new IllegalStateException("not available");
}
@Override
public LinkedList<PlcField> getFields() {
- return null;
+ throw new IllegalStateException("not available");
}
@Override
public PlcSubscriptionType getPlcSubscriptionType() {
- return null;
+ throw new IllegalStateException("not available");
+ }
+
+ @Override
+ public LinkedList<Pair<String, PlcField>> getNamedFields() {
+ throw new IllegalStateException("not available");
}
public static class Builder implements PlcSubscriptionRequest.Builder {
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
index 38a52f0..5dbccea 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
@@ -18,6 +18,7 @@
*/
package org.apache.plc4x.java.base.messages;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
@@ -59,6 +60,11 @@ public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptio
return internalPlcSubscriptionHandles;
}
+ @Override
+ public LinkedList<Pair<String, PlcField>> getNamedFields() {
+ throw new IllegalStateException("not available");
+ }
+
public static class Builder implements PlcUnsubscriptionRequest.Builder {
private List<InternalPlcSubscriptionHandle> plcSubscriptionHandles;
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
index 1eb3458..d7e4086 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.java.base.messages;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.model.PlcField;
@@ -39,7 +40,7 @@ public class DefaultPlcWriteRequest implements InternalPlcWriteRequest, Internal
private final LinkedHashMap<String, Pair<PlcField, FieldItem>> fields;
- private DefaultPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
+ protected DefaultPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
this.fields = fields;
}
@@ -74,6 +75,31 @@ public class DefaultPlcWriteRequest implements InternalPlcWriteRequest, Internal
}
@Override
+ public LinkedList<Pair<String, PlcField>> getNamedFields() {
+ return fields.entrySet()
+ .stream()
+ .map(stringPairEntry ->
+ Pair.of(
+ stringPairEntry.getKey(),
+ stringPairEntry.getValue().getKey()
+ )
+ ).collect(Collectors.toCollection(LinkedList::new));
+ }
+
+ @Override
+ public LinkedList<Triple<String, PlcField, FieldItem>> getNamedFieldTriples() {
+ return fields.entrySet()
+ .stream()
+ .map(stringPairEntry ->
+ Triple.of(
+ stringPairEntry.getKey(),
+ stringPairEntry.getValue().getKey(),
+ stringPairEntry.getValue().getValue()
+ )
+ ).collect(Collectors.toCollection(LinkedList::new));
+ }
+
+ @Override
public int getNumberOfValues(String name) {
return fields.get(name).getValue().getNumberOfValues();
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteResponse.java
index e7c5e83..a004f0f 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteResponse.java
@@ -35,6 +35,11 @@ public class DefaultPlcWriteResponse implements InternalPlcWriteResponse {
}
@Override
+ public Map<String, PlcResponseCode> getValues() {
+ return values;
+ }
+
+ @Override
public InternalPlcWriteRequest getRequest() {
return request;
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java
index 55d508f..a7ee30c 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldRequest.java
@@ -18,8 +18,14 @@
*/
package org.apache.plc4x.java.base.messages;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.plc4x.java.api.messages.PlcFieldRequest;
+import org.apache.plc4x.java.api.model.PlcField;
+
+import java.util.LinkedList;
public interface InternalPlcFieldRequest extends PlcFieldRequest {
+ LinkedList<Pair<String, PlcField>> getNamedFields();
+
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadRequest.java
index b1cd4b0..c787f00 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadRequest.java
@@ -20,7 +20,7 @@ package org.apache.plc4x.java.base.messages;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
-public interface InternalPlcReadRequest extends PlcReadRequest, InternalPlcRequest {
+public interface InternalPlcReadRequest extends PlcReadRequest, InternalPlcFieldRequest, InternalPlcRequest {
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
index 27ae68e..775a4c7 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
@@ -18,8 +18,14 @@
*/
package org.apache.plc4x.java.base.messages;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.items.FieldItem;
+
+import java.util.Map;
public interface InternalPlcReadResponse extends PlcReadResponse<InternalPlcReadRequest>, InternalPlcResponse<InternalPlcReadRequest> {
+ Map<String, Pair<PlcResponseCode, FieldItem>> getValues();
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteRequest.java
index a4bad51..f10cd89 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteRequest.java
@@ -18,7 +18,9 @@
*/
package org.apache.plc4x.java.base.messages;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.base.messages.items.FieldItem;
import java.util.LinkedList;
@@ -28,4 +30,6 @@ public interface InternalPlcWriteRequest extends PlcWriteRequest, InternalPlcReq
FieldItem getFieldItem(String name);
LinkedList<FieldItem> getFieldItems();
+
+ LinkedList<Triple<String, PlcField, FieldItem>> getNamedFieldTriples();
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
index 9d127a5..dbcdc5c 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
@@ -19,7 +19,10 @@
package org.apache.plc4x.java.base.messages;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
-public interface InternalPlcWriteResponse extends PlcWriteResponse<InternalPlcWriteRequest>, InternalPlcResponse<InternalPlcWriteRequest> {
+import java.util.Map;
+public interface InternalPlcWriteResponse extends PlcWriteResponse<InternalPlcWriteRequest>, InternalPlcResponse<InternalPlcWriteRequest> {
+ Map<String, PlcResponseCode> getValues();
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
deleted file mode 100644
index 5212618..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedRequestItem.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.base.messages.item;
-
-import org.apache.plc4x.java.api.messages.PlcRequestContainer;
-import org.apache.plc4x.java.api.messages.PlcResponse;
-import org.apache.plc4x.java.api.messages.items.RequestItem;
-
-import java.util.Objects;
-
-public class CorrelatedRequestItem<REQUEST_ITEM extends RequestItem<?>> {
-
- private final int correlationId;
-
- private final REQUEST_ITEM requestItem;
-
- private final PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer;
-
- public CorrelatedRequestItem(int correlationId, REQUEST_ITEM requestItem, PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer) {
- this.correlationId = correlationId;
- this.requestItem = requestItem;
- this.plcRequestContainer = plcRequestContainer;
- }
-
- public int getCorrelationId() {
- return correlationId;
- }
-
- public REQUEST_ITEM getRequestItem() {
- return requestItem;
- }
-
- public PlcRequestContainer<?, PlcResponse<?, ?, ?>> getPlcRequestContainer() {
- return plcRequestContainer;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof CorrelatedRequestItem)) {
- return false;
- }
- CorrelatedRequestItem<?> that = (CorrelatedRequestItem<?>) o;
- return correlationId == that.correlationId &&
- Objects.equals(requestItem, that.requestItem) &&
- Objects.equals(plcRequestContainer, that.plcRequestContainer);
- }
-
- @Override
- public int hashCode() {
-
- return Objects.hash(correlationId, requestItem, plcRequestContainer);
- }
-
- @Override
- public String toString() {
- return "CorrelatedRequestItem{" +
- "correlationId=" + correlationId +
- ", requestItem=" + requestItem +
- ", plcRequestContainer=" + plcRequestContainer +
- '}';
- }
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
deleted file mode 100644
index 38a9032..0000000
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/item/CorrelatedResponseItem.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- */
-package org.apache.plc4x.java.base.messages.item;
-
-import org.apache.plc4x.java.api.messages.items.ResponseItem;
-
-import java.util.Objects;
-
-public class CorrelatedResponseItem<RESPONSE_ITEM extends ResponseItem<?>> {
-
- private final int correlationId;
-
- private final RESPONSE_ITEM responseItem;
-
- public CorrelatedResponseItem(int correlationId, RESPONSE_ITEM responseItem) {
- this.correlationId = correlationId;
- this.responseItem = responseItem;
- }
-
- public int getCorrelationId() {
- return correlationId;
- }
-
- public RESPONSE_ITEM getResponseItem() {
- return responseItem;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof CorrelatedResponseItem)) {
- return false;
- }
- CorrelatedResponseItem<?> that = (CorrelatedResponseItem<?>) o;
- return correlationId == that.correlationId &&
- Objects.equals(responseItem, that.responseItem);
- }
-
- @Override
- public int hashCode() {
-
- return Objects.hash(correlationId, responseItem);
- }
-
- @Override
- public String toString() {
- return "CorrelatedResponseItem{" +
- "correlationId=" + correlationId +
- ", responseItem=" + responseItem +
- '}';
- }
-}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 8c0e272..267f9a8 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -19,52 +19,39 @@
package org.apache.plc4x.java.base.protocol;
import io.netty.channel.*;
-import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.PromiseCombiner;
-import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
-import org.apache.plc4x.java.api.messages.*;
-import org.apache.plc4x.java.api.messages.items.RequestItem;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteRequest;
-import org.apache.plc4x.java.api.messages.specific.TypeSafePlcWriteResponse;
-import org.apache.plc4x.java.base.messages.item.CorrelatedRequestItem;
-import org.apache.plc4x.java.base.messages.item.CorrelatedResponseItem;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.*;
+import org.apache.plc4x.java.base.messages.items.FieldItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
+// TODO: write test
public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
private PendingWriteQueue queue;
- private ConcurrentMap<Integer, CorrelatedRequestItem<?>> sentButUnacknowledgedRequestItems;
+ private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedRequestItems;
private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap;
- private ConcurrentMap<PlcRequestContainer<?, ?>, List<CorrelatedResponseItem<?>>> responsesToBeDevliered;
+ private ConcurrentMap<PlcRequestContainer<?, ?>, List<InternalPlcResponse<?>>> responsesToBeDevliered;
private AtomicInteger correlationId;
- private final MessageToMessageDecoder<CorrelatedResponseItem> decoder = new MessageToMessageDecoder<CorrelatedResponseItem>() {
-
- @Override
- protected void decode(ChannelHandlerContext ctx, CorrelatedResponseItem msg, List<Object> out) throws Exception {
- SingleItemToSingleRequestProtocol.this.decode(ctx, msg, out);
- }
- };
-
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
this.queue = new PendingWriteQueue(ctx);
@@ -91,42 +78,49 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
// Decoding
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- decoder.channelRead(ctx, msg);
- super.read(ctx);
- }
-
- private void decode(ChannelHandlerContext ctx, CorrelatedResponseItem<?> msg, List<Object> out) throws PlcProtocolException {
- int correlationId = msg.getCorrelationId();
- CorrelatedRequestItem<?> correlatedRequestItem = sentButUnacknowledgedRequestItems.remove(correlationId);
- if (correlatedRequestItem == null) {
- throw new PlcProtocolException("Unrelated package received " + msg);
+ private void tryFinish(int correlationId, InternalPlcResponse msg) {
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId);
+ if (plcRequestContainer == null) {
+ throw new PlcRuntimeException("Unrelated package received " + msg);
}
- PlcRequestContainer<?, PlcResponse<?, ?, ?>> plcRequestContainer = correlatedRequestItem.getPlcRequestContainer();
- List<CorrelatedResponseItem<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
+ List<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDevliered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
correlatedResponseItems.add(msg);
Set<Integer> integers = containerCorrelationIdMap.get(plcRequestContainer);
integers.remove(correlationId);
if (integers.isEmpty()) {
- PlcResponse<?, ?, ?> plcResponse;
- if (plcRequestContainer.getRequest() instanceof TypeSafePlcReadRequest) {
- TypeSafePlcReadRequest typeSafePlcReadRequest = (TypeSafePlcReadRequest) plcRequestContainer.getRequest();
- plcResponse = new TypeSafePlcReadResponse((TypeSafePlcReadRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
- } else if (plcRequestContainer.getRequest() instanceof TypeSafePlcWriteRequest) {
- plcResponse = new TypeSafePlcWriteResponse((TypeSafePlcWriteRequest<?>) plcRequestContainer.getRequest(), correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
- } else if (plcRequestContainer.getRequest() instanceof PlcReadRequest) {
- plcResponse = new PlcReadResponse((PlcReadRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
- } else if (plcRequestContainer.getRequest() instanceof PlcWriteRequest) {
- plcResponse = new PlcWriteResponse((PlcWriteRequest) plcRequestContainer.getRequest(), (List) correlatedResponseItems.stream().map(correlatedResponseItem -> correlatedResponseItem.getResponseItem()).collect(Collectors.toList()));
+ InternalPlcResponse<?> plcResponse;
+ if (plcRequestContainer.getRequest() instanceof InternalPlcReadRequest) {
+ InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) plcRequestContainer.getRequest();
+ HashMap<String, Pair<PlcResponseCode, FieldItem>> fields = new HashMap<>();
+
+ correlatedResponseItems.stream()
+ .map(InternalPlcReadResponse.class::cast)
+ .map(InternalPlcReadResponse::getValues)
+ .forEach(stringPairMap -> stringPairMap.forEach(fields::put));
+
+ plcResponse = new DefaultPlcReadResponse(internalPlcReadRequest, fields);
+ } else if (plcRequestContainer.getRequest() instanceof InternalPlcWriteRequest) {
+ InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) plcRequestContainer.getRequest();
+ HashMap<String, PlcResponseCode> values = new HashMap<>();
+
+ correlatedResponseItems.stream()
+ .map(InternalPlcWriteResponse.class::cast)
+ .map(InternalPlcWriteResponse::getValues)
+ .forEach(stringPairMap -> stringPairMap.forEach(values::put));
+
+ plcResponse = new DefaultPlcWriteResponse(internalPlcWriteRequest, values);
} else {
- throw new PlcProtocolException("Unknown type detected " + plcRequestContainer.getRequest());
+ throw new PlcRuntimeException("Unknown type detected " + plcRequestContainer.getRequest());
}
plcRequestContainer.getResponseFuture().complete(plcResponse);
responsesToBeDevliered.remove(plcRequestContainer);
}
}
+ private void errored(int correlationId, Throwable throwable) {
+
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Encoding
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -134,22 +128,68 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof PlcRequestContainer) {
- PlcRequestContainer<?, PlcResponse<?, ?, ?>> in = (PlcRequestContainer<?, PlcResponse<?, ?, ?>>) msg;
+ @SuppressWarnings("unchecked")
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>) msg;
Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> new HashSet<>());
// Create a promise that has to be called multiple times.
PromiseCombiner promiseCombiner = new PromiseCombiner();
- PlcRequest<?> request = in.getRequest();
- for (RequestItem<?> item : request.getRequestItems()) {
- ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+ InternalPlcRequest request = in.getRequest();
+ if (request instanceof InternalPlcFieldRequest) {
+ InternalPlcFieldRequest internalPlcFieldRequest = (InternalPlcFieldRequest) request;
+
+ if (internalPlcFieldRequest instanceof InternalPlcReadRequest) {
+ InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) internalPlcFieldRequest;
+ // TODO: repackage
+ internalPlcReadRequest.getNamedFields().forEach(field -> {
+ ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+
+ int tdpu = correlationId.getAndIncrement();
+ CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
+ .thenApply(InternalPlcResponse.class::cast)
+ .whenComplete((internalPlcResponse, throwable) -> {
+ if (throwable != null) {
+ errored(tdpu, throwable);
+ } else {
+ tryFinish(tdpu, internalPlcResponse);
+ }
+ });
+ queue.add(new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture), subPromise);
+ if (!tdpus.add(tdpu)) {
+ throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+ }
+ promiseCombiner.add((Future) subPromise);
+ });
+ }
+ if (internalPlcFieldRequest instanceof InternalPlcWriteRequest) {
+ InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) internalPlcFieldRequest;
+ // TODO: repackage
+ internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> {
+ ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
- int tdpu = correlationId.getAndIncrement();
- queue.add(new CorrelatedRequestItem<>(tdpu, item, in), subPromise);
- if (!tdpus.add(tdpu)) {
- throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+ int tdpu = correlationId.getAndIncrement();
+ CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
+ .thenApply(InternalPlcResponse.class::cast)
+ .whenComplete((internalPlcResponse, throwable) -> {
+ if (throwable != null) {
+ errored(tdpu, throwable);
+ } else {
+ tryFinish(tdpu, internalPlcResponse);
+ }
+ });
+ queue.add(new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture), subPromise);
+ if (!tdpus.add(tdpu)) {
+ throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
+ }
+ promiseCombiner.add((Future) subPromise);
+ });
}
+ } else {
+ ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
+ queue.add(msg, subPromise);
promiseCombiner.add((Future) subPromise);
}
+
promiseCombiner.finish(promise);
// Start sending the queue content.
@@ -166,11 +206,9 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
protected synchronized void trySendingMessages(ChannelHandlerContext ctx) {
while (queue.size() > 0) {
// Get the RequestItem that is up next in the queue.
- CorrelatedRequestItem<?> currentItem = (CorrelatedRequestItem) queue.current();
+ PlcRequestContainer currentItem = (PlcRequestContainer) queue.current();
+ InternalPlcRequest request = currentItem.getRequest();
- if (currentItem == null) {
- break;
- }
// Send the TPDU.
try {
ChannelFuture channelFuture = queue.removeAndWrite();
@@ -183,11 +221,62 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
ctx.fireExceptionCaught(e);
}
- // Add it to the list of sentButUnacknowledgedRequestItems.
- sentButUnacknowledgedRequestItems.put(currentItem.getCorrelationId(), currentItem);
+ if (request instanceof CorrelatedPlcRequest) {
+ CorrelatedPlcRequest correlatedPlcRequest = (CorrelatedPlcRequest) request;
+
+ // Add it to the list of sentButUnacknowledgedRequestItems.
+ sentButUnacknowledgedRequestItems.put(correlatedPlcRequest.getTdpu(), currentItem);
- LOGGER.debug("Item Message with id {} sent", currentItem.getCorrelationId());
+ LOGGER.debug("Item Message with id {} sent", correlatedPlcRequest.getTdpu());
+ }
}
ctx.flush();
}
+
+ interface CorrelatedPlcRequest extends InternalPlcRequest {
+
+ int getTdpu();
+ }
+
+ private static class CorrelatedPlcReadRequest extends DefaultPlcReadRequest implements CorrelatedPlcRequest {
+
+ private final int tdpu;
+
+ public CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) {
+ super(fields);
+ this.tdpu = tdpu;
+ }
+
+ public static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
+ LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
+ fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue());
+ return new CorrelatedPlcReadRequest(fields, tdpu);
+ }
+
+ @Override
+ public int getTdpu() {
+ return tdpu;
+ }
+ }
+
+ private static class CorrelatedPlcWriteRequest extends DefaultPlcWriteRequest implements CorrelatedPlcRequest {
+
+ private final int tdpu;
+
+ public CorrelatedPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields, int tdpu) {
+ super(fields);
+ this.tdpu = tdpu;
+ }
+
+ public static CorrelatedPlcWriteRequest of(Triple<String, PlcField, FieldItem> fieldItemTriple, int tdpu) {
+ LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
+ fields.put(fieldItemTriple.getLeft(), Pair.of(fieldItemTriple.getMiddle(), fieldItemTriple.getRight()));
+ return new CorrelatedPlcWriteRequest(fields, tdpu);
+ }
+
+ @Override
+ public int getTdpu() {
+ return tdpu;
+ }
+ }
}