You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/10/17 16:48:09 UTC
[incubator-plc4x] 02/19: implemented new api in driver base
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit edc56be4efffde229e524be60d548505e48264bf
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Thu Oct 4 11:09:02 2018 +0200
implemented new api in driver base
---
.../messages/DefaultPlcProprietaryRequest.java | 9 +++
.../messages/DefaultPlcProprietaryResponse.java | 6 +-
.../java/base/messages/DefaultPlcReadRequest.java | 22 +++++++-
.../messages/DefaultPlcSubscriptionRequest.java | 20 ++++++-
.../messages/DefaultPlcUnsubscriptionRequest.java | 19 ++++++-
.../java/base/messages/DefaultPlcWriteRequest.java | 18 +++++-
.../base/messages/InternalPlcFieldResponse.java | 2 +-
.../messages/InternalPlcProprietaryResponse.java | 2 +-
.../base/messages/InternalPlcReadResponse.java | 2 +-
.../java/base/messages/InternalPlcResponse.java | 3 +-
.../base/messages/InternalPlcWriteResponse.java | 2 +-
.../SingleItemToSingleRequestProtocol.java | 65 ++++++++++++----------
.../SingleItemToSingleRequestProtocolTest.java | 60 ++++++++++++--------
13 files changed, 160 insertions(+), 70 deletions(-)
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryRequest.java
index fa716f0..4e73899 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryRequest.java
@@ -18,8 +18,17 @@
*/
package org.apache.plc4x.java.base.messages;
+import org.apache.plc4x.java.api.messages.PlcResponse;
+
+import java.util.concurrent.CompletableFuture;
+
public class DefaultPlcProprietaryRequest<REQUEST> implements InternalPlcProprietaryRequest<REQUEST> {
+ @Override
+ public CompletableFuture<PlcResponse> execute() {
+ throw new RuntimeException("not supported"); // TODO: figure out what to do with this
+ }
+
private REQUEST proprietaryRequest;
public DefaultPlcProprietaryRequest(REQUEST proprietaryRequest) {
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryResponse.java
index 679fde1..69ee2d0 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcProprietaryResponse.java
@@ -18,13 +18,13 @@
*/
package org.apache.plc4x.java.base.messages;
-public class DefaultPlcProprietaryResponse<REQUEST, RESPONSE> implements InternalPlcProprietaryResponse<REQUEST, RESPONSE> {
+public class DefaultPlcProprietaryResponse<RESPONSE> implements InternalPlcProprietaryResponse<RESPONSE> {
- private final InternalPlcProprietaryRequest<REQUEST> plcProprietaryRequest;
+ private final InternalPlcProprietaryRequest plcProprietaryRequest;
private final RESPONSE proprietaryResponse;
- public DefaultPlcProprietaryResponse(InternalPlcProprietaryRequest<REQUEST> plcProprietaryRequest, RESPONSE proprietaryResponse) {
+ public DefaultPlcProprietaryResponse(InternalPlcProprietaryRequest plcProprietaryRequest, RESPONSE proprietaryResponse) {
this.plcProprietaryRequest = plcProprietaryRequest;
this.proprietaryResponse = proprietaryResponse;
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
index 9b60fe4..d835bc0 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcReadRequest.java
@@ -19,23 +19,33 @@ under the License.
package org.apache.plc4x.java.base.messages;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.base.connection.PlcFieldHandler;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class DefaultPlcReadRequest implements InternalPlcReadRequest, InternalPlcFieldRequest {
+ private final PlcReader reader;
private LinkedHashMap<String, PlcField> fields;
- protected DefaultPlcReadRequest(LinkedHashMap<String, PlcField> fields) {
+ protected DefaultPlcReadRequest(PlcReader reader, LinkedHashMap<String, PlcField> fields) {
+ this.reader = reader;
this.fields = fields;
}
@Override
+ public CompletableFuture<PlcReadResponse> execute() {
+ return reader.read(this);
+ }
+
+ @Override
public int getNumberOfFields() {
return fields.size();
}
@@ -64,12 +74,18 @@ public class DefaultPlcReadRequest implements InternalPlcReadRequest, InternalPl
.collect(Collectors.toCollection(LinkedList::new));
}
+ protected PlcReader getReader() {
+ return reader;
+ }
+
public static class Builder implements PlcReadRequest.Builder {
+ private final PlcReader reader;
private final PlcFieldHandler fieldHandler;
private final Map<String, String> fields;
- public Builder(PlcFieldHandler fieldHandler) {
+ public Builder(PlcReader reader, PlcFieldHandler fieldHandler) {
+ this.reader = reader;
this.fieldHandler = fieldHandler;
fields = new TreeMap<>();
}
@@ -90,7 +106,7 @@ public class DefaultPlcReadRequest implements InternalPlcReadRequest, InternalPl
PlcField parsedField = fieldHandler.createField(fieldQuery);
parsedFields.put(name, parsedField);
});
- return new DefaultPlcReadRequest(parsedFields);
+ return new DefaultPlcReadRequest(reader, parsedFields);
}
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
index 3194cf5..35bf369 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
@@ -20,7 +20,9 @@ package org.apache.plc4x.java.base.messages;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.connection.PlcSubscriber;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.types.PlcSubscriptionType;
import org.apache.plc4x.java.base.connection.PlcFieldHandler;
@@ -28,11 +30,23 @@ import org.apache.plc4x.java.base.messages.items.FieldItem;
import java.time.Duration;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
// TODO: request broken needs finishing.
public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionRequest, InternalPlcFieldRequest {
+ private final PlcSubscriber subscriber;
+
+ public DefaultPlcSubscriptionRequest(PlcSubscriber subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ @Override
+ public CompletableFuture<PlcSubscriptionResponse> execute() {
+ return subscriber.subscribe(this);
+ }
+
@Override
public int getNumberOfFields() {
throw new IllegalStateException("not available");
@@ -65,10 +79,12 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
public static class Builder implements PlcSubscriptionRequest.Builder {
+ private final PlcSubscriber subscriber;
private final PlcFieldHandler fieldHandler;
private final Map<String, BuilderItem<Object>> fields;
- public Builder(PlcFieldHandler fieldHandler) {
+ public Builder(PlcSubscriber subscriber, PlcFieldHandler fieldHandler) {
+ this.subscriber = subscriber;
this.fieldHandler = fieldHandler;
fields = new TreeMap<>();
}
@@ -99,7 +115,7 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq
FieldItem fieldItem = builderItem.encoder.apply(parsedField, null);
parsedFields.put(name, new ImmutablePair<>(parsedField, fieldItem));
});
- return new DefaultPlcSubscriptionRequest();
+ return new DefaultPlcSubscriptionRequest(subscriber);
}
private static class BuilderItem<T> {
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
index 03e1f3e..45300a5 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcUnsubscriptionRequest.java
@@ -19,24 +19,35 @@
package org.apache.plc4x.java.base.messages;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.connection.PlcSubscriber;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.base.model.InternalPlcSubscriptionHandle;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
// TODO: request broken needs finishing.
public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptionRequest, InternalPlcFieldRequest {
+ private final PlcSubscriber subscriber;
+
private final Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles;
- public DefaultPlcUnsubscriptionRequest(Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles) {
+ public DefaultPlcUnsubscriptionRequest(PlcSubscriber subscriber, Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles) {
+ this.subscriber = subscriber;
this.internalPlcSubscriptionHandles = internalPlcSubscriptionHandles;
}
@Override
+ public CompletableFuture<PlcUnsubscriptionResponse> execute() {
+ return subscriber.unsubscribe(this);
+ }
+
+ @Override
public int getNumberOfFields() {
throw new IllegalStateException("not available");
}
@@ -68,9 +79,11 @@ public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptio
public static class Builder implements PlcUnsubscriptionRequest.Builder {
+ private final PlcSubscriber subscriber;
private List<InternalPlcSubscriptionHandle> plcSubscriptionHandles;
- public Builder() {
+ public Builder(PlcSubscriber subscriber) {
+ this.subscriber = subscriber;
plcSubscriptionHandles = new ArrayList<>();
}
@@ -94,7 +107,7 @@ public class DefaultPlcUnsubscriptionRequest implements InternalPlcUnsubscriptio
@Override
public PlcUnsubscriptionRequest build() {
- return new DefaultPlcUnsubscriptionRequest(plcSubscriptionHandles);
+ return new DefaultPlcUnsubscriptionRequest(subscriber, plcSubscriptionHandles);
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
index c78ff08..d475b83 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcWriteRequest.java
@@ -21,8 +21,10 @@ package org.apache.plc4x.java.base.messages;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
+import org.apache.plc4x.java.api.connection.PlcWriter;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.base.connection.PlcFieldHandler;
import org.apache.plc4x.java.base.messages.items.FieldItem;
@@ -33,18 +35,26 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
public class DefaultPlcWriteRequest implements InternalPlcWriteRequest, InternalPlcFieldRequest {
+ private final PlcWriter writer;
private final LinkedHashMap<String, Pair<PlcField, FieldItem>> fields;
- protected DefaultPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
+ protected DefaultPlcWriteRequest(PlcWriter writer, LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
+ this.writer = writer;
this.fields = fields;
}
@Override
+ public CompletableFuture<PlcWriteResponse> execute() {
+ return writer.write(this);
+ }
+
+ @Override
public int getNumberOfFields() {
return fields.size();
}
@@ -106,11 +116,13 @@ public class DefaultPlcWriteRequest implements InternalPlcWriteRequest, Internal
public static class Builder implements PlcWriteRequest.Builder {
+ private final PlcWriter writer;
private final PlcFieldHandler fieldHandler;
private final Map<String, BuilderItem<Object>> fields;
private final Map<Class<?>, BiFunction<PlcField, Object[], FieldItem>> handlerMap;
- public Builder(PlcFieldHandler fieldHandler) {
+ public Builder(PlcWriter writer, PlcFieldHandler fieldHandler) {
+ this.writer = writer;
this.fieldHandler = fieldHandler;
fields = new TreeMap<>();
handlerMap = new HashMap<>();
@@ -236,7 +248,7 @@ public class DefaultPlcWriteRequest implements InternalPlcWriteRequest, Internal
FieldItem fieldItem = builderItem.encoder.apply(parsedField, builderItem.values);
parsedFields.put(name, new ImmutablePair<>(parsedField, fieldItem));
});
- return new DefaultPlcWriteRequest(parsedFields);
+ return new DefaultPlcWriteRequest(writer, parsedFields);
}
private Builder addItem(String name, String fieldQuery, Object[] values, BiFunction<PlcField, Object[], FieldItem> encoder) {
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldResponse.java
index 8486388..78f6320 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcFieldResponse.java
@@ -20,7 +20,7 @@ package org.apache.plc4x.java.base.messages;
import org.apache.plc4x.java.api.messages.PlcFieldResponse;
-public interface InternalPlcFieldResponse<REQUEST_TYPE extends InternalPlcFieldRequest> extends PlcFieldResponse<REQUEST_TYPE> {
+public interface InternalPlcFieldResponse extends PlcFieldResponse {
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryResponse.java
index eb5aa3b..1bbea2f 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcProprietaryResponse.java
@@ -20,5 +20,5 @@ package org.apache.plc4x.java.base.messages;
import org.apache.plc4x.java.api.messages.PlcProprietaryResponse;
-public interface InternalPlcProprietaryResponse<REQUEST, RESPONSE> extends PlcProprietaryResponse<InternalPlcProprietaryRequest<REQUEST>, RESPONSE>, InternalPlcResponse<InternalPlcProprietaryRequest<REQUEST>> {
+public interface InternalPlcProprietaryResponse<RESPONSE> extends PlcProprietaryResponse<RESPONSE>, InternalPlcResponse {
}
\ No newline at end of file
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
index 775a4c7..53cf0b5 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcReadResponse.java
@@ -25,7 +25,7 @@ import org.apache.plc4x.java.base.messages.items.FieldItem;
import java.util.Map;
-public interface InternalPlcReadResponse extends PlcReadResponse<InternalPlcReadRequest>, InternalPlcResponse<InternalPlcReadRequest> {
+public interface InternalPlcReadResponse extends PlcReadResponse, InternalPlcResponse {
Map<String, Pair<PlcResponseCode, FieldItem>> getValues();
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcResponse.java
index 95c9223..dde16f0 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcResponse.java
@@ -20,5 +20,6 @@ package org.apache.plc4x.java.base.messages;
import org.apache.plc4x.java.api.messages.PlcResponse;
-public interface InternalPlcResponse<REQUEST_TYPE extends InternalPlcRequest> extends PlcResponse<REQUEST_TYPE> {
+public interface InternalPlcResponse extends PlcResponse {
+
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
index dbcdc5c..06d4ad7 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcWriteResponse.java
@@ -23,6 +23,6 @@ import org.apache.plc4x.java.api.types.PlcResponseCode;
import java.util.Map;
-public interface InternalPlcWriteResponse extends PlcWriteResponse<InternalPlcWriteRequest>, InternalPlcResponse<InternalPlcWriteRequest> {
+public interface InternalPlcWriteResponse extends PlcWriteResponse, InternalPlcResponse {
Map<String, PlcResponseCode> getValues();
}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index a9c4a63..3b13a88 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -25,6 +25,8 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.PromiseCombiner;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
+import org.apache.plc4x.java.api.connection.PlcReader;
+import org.apache.plc4x.java.api.connection.PlcWriter;
import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
import org.apache.plc4x.java.api.exceptions.PlcTimeoutException;
import org.apache.plc4x.java.api.model.PlcField;
@@ -49,26 +51,29 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
private final Timer timer;
+ private final PlcReader reader;
+ private final PlcWriter writer;
+
// TODO: maybe better get from map
private long defaultReceiveTimeout;
private PendingWriteQueue queue;
- private ConcurrentMap<PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>, Timeout> scheduledTimeouts;
+ private ConcurrentMap<PlcRequestContainer<InternalPlcRequest, InternalPlcResponse>, Timeout> scheduledTimeouts;
// Map to track send subcontainers
- private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedSubContainer;
+ private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse>> sentButUnacknowledgedSubContainer;
// Map to map tdpu to original parent container
// TODO: currently this could be supplied via param, only reason to keep would be for statistics.
- private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> correlationToParentContainer;
+ private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse>> correlationToParentContainer;
// Map to track tdpus per container
// TODO: currently this could be supplied via param, only reason to keep would be for statistics.
private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap;
// Map to track a list of responses per parent container
- private ConcurrentMap<PlcRequestContainer<?, ?>, Queue<InternalPlcResponse<?>>> responsesToBeDelivered;
+ private ConcurrentMap<PlcRequestContainer<?, ?>, Queue<InternalPlcResponse>> responsesToBeDelivered;
private AtomicInteger correlationIdGenerator;
@@ -81,15 +86,17 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
private AtomicLong erroredItems;
- public SingleItemToSingleRequestProtocol(Timer timer) {
- this(timer, true);
+ public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer) {
+ this(reader, writer, timer, true);
}
- public SingleItemToSingleRequestProtocol(Timer timer, boolean betterImplementationPossible) {
- this(timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
+ public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer, boolean betterImplementationPossible) {
+ this(reader, writer, timer, TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
}
- public SingleItemToSingleRequestProtocol(Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) {
+ public SingleItemToSingleRequestProtocol(PlcReader reader, PlcWriter writer, Timer timer, long defaultReceiveTimeout, boolean betterImplementationPossible) {
+ this.reader = reader;
+ this.writer = writer;
this.timer = timer;
this.defaultReceiveTimeout = defaultReceiveTimeout;
if (betterImplementationPossible) {
@@ -155,16 +162,16 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
// Decoding
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- protected void tryFinish(Integer currentTdpu, InternalPlcResponse msg, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+ protected void tryFinish(Integer currentTdpu, InternalPlcResponse msg, CompletableFuture<InternalPlcResponse> originalResponseFuture) {
deliveredItems.incrementAndGet();
- PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
LOGGER.info("{} got acknowledged", subPlcRequestContainer);
- PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
if (originalPlcRequestContainer == null) {
LOGGER.warn("Unrelated package received {}", msg);
return;
}
- Queue<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(originalPlcRequestContainer, ignore -> new ConcurrentLinkedQueue<>());
+ Queue<InternalPlcResponse> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(originalPlcRequestContainer, ignore -> new ConcurrentLinkedQueue<>());
correlatedResponseItems.add(msg);
Set<Integer> integers = containerCorrelationIdMap.get(originalPlcRequestContainer);
integers.remove(currentTdpu);
@@ -175,7 +182,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
timeout.cancel();
}
- InternalPlcResponse<?> plcResponse;
+ InternalPlcResponse plcResponse;
if (originalPlcRequestContainer.getRequest() instanceof InternalPlcReadRequest) {
InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) originalPlcRequestContainer.getRequest();
HashMap<String, Pair<PlcResponseCode, FieldItem>> fields = new HashMap<>();
@@ -206,13 +213,13 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
}
}
- protected void errored(Integer currentTdpu, Throwable throwable, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+ protected void errored(Integer currentTdpu, Throwable throwable, CompletableFuture<InternalPlcResponse> originalResponseFuture) {
erroredItems.incrementAndGet();
- PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(currentTdpu);
LOGGER.info("{} got errored", subPlcRequestContainer);
- PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> originalPlcRequestContainer = correlationToParentContainer.remove(currentTdpu);
if (originalPlcRequestContainer == null) {
LOGGER.warn("Unrelated error received tdpu:{}", currentTdpu, throwable);
} else {
@@ -246,7 +253,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof PlcRequestContainer) {
@SuppressWarnings("unchecked")
- PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>) msg;
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse>) msg;
Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> ConcurrentHashMap.newKeySet());
Timeout timeout = timer.newTimeout(timeout_ -> handleTimeout(timeout_, in, tdpus, System.nanoTime()), defaultReceiveTimeout, TimeUnit.MILLISECONDS);
@@ -275,7 +282,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
}
});
- PlcRequestContainer<CorrelatedPlcReadRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture);
+ PlcRequestContainer<CorrelatedPlcReadRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(reader, field, tdpu), correlatedCompletableFuture);
correlationToParentContainer.put(tdpu, in);
queue.add(correlatedPlcRequestContainer, subPromise);
if (!tdpus.add(tdpu)) {
@@ -298,7 +305,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
}
});
- PlcRequestContainer<CorrelatedPlcWriteRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture);
+ PlcRequestContainer<CorrelatedPlcWriteRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(writer, fieldItemTriple, tdpu), correlatedCompletableFuture);
correlationToParentContainer.put(tdpu, in);
queue.add(correlatedPlcRequestContainer, subPromise);
if (!tdpus.add(tdpu)) {
@@ -360,7 +367,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
ctx.flush();
}
- private void handleTimeout(Timeout timeout, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in, Set<Integer> tdpus, long scheduledAt) {
+ private void handleTimeout(Timeout timeout, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse> in, Set<Integer> tdpus, long scheduledAt) {
if (timeout.isCancelled()) {
LOGGER.debug("container {} with timeout {} got canceled", in, timeout);
return;
@@ -386,15 +393,15 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
protected final int tdpu;
- protected CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) {
- super(fields);
+ protected CorrelatedPlcReadRequest(PlcReader reader, LinkedHashMap<String, PlcField> fields, int tdpu) {
+ super(reader, fields);
this.tdpu = tdpu;
}
- protected static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
+ protected static CorrelatedPlcReadRequest of(PlcReader reader, Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue());
- return new CorrelatedPlcReadRequest(fields, tdpu);
+ return new CorrelatedPlcReadRequest(reader, fields, tdpu);
}
@Override
@@ -407,15 +414,15 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
private final int tdpu;
- public CorrelatedPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields, int tdpu) {
- super(fields);
+ public CorrelatedPlcWriteRequest(PlcWriter writer, LinkedHashMap<String, Pair<PlcField, FieldItem>> fields, int tdpu) {
+ super(writer, fields);
this.tdpu = tdpu;
}
- public static CorrelatedPlcWriteRequest of(Triple<String, PlcField, FieldItem> fieldItemTriple, int tdpu) {
+ public static CorrelatedPlcWriteRequest of(PlcWriter writer, Triple<String, PlcField, FieldItem> fieldItemTriple, int tdpu) {
LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
fields.put(fieldItemTriple.getLeft(), Pair.of(fieldItemTriple.getMiddle(), fieldItemTriple.getRight()));
- return new CorrelatedPlcWriteRequest(fields, tdpu);
+ return new CorrelatedPlcWriteRequest(writer, fields, tdpu);
}
@Override
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index 47d06c5..c97d2a6 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -25,6 +25,9 @@ import io.netty.channel.PendingWriteQueue;
import io.netty.util.HashedWheelTimer;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.connection.PlcReader;
+import org.apache.plc4x.java.api.connection.PlcSubscriber;
+import org.apache.plc4x.java.api.connection.PlcWriter;
import org.apache.plc4x.java.api.messages.PlcFieldRequest;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.types.PlcResponseCode;
@@ -52,8 +55,18 @@ import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class SingleItemToSingleRequestProtocolTest implements WithAssertions {
+ PlcReader mockReader = null;
+ PlcWriter mockWriter = null;
+ PlcSubscriber mockSubscriber = null;
+
@InjectMocks
- SingleItemToSingleRequestProtocol SUT = new SingleItemToSingleRequestProtocol(new HashedWheelTimer(), TimeUnit.SECONDS.toMillis(1), false);
+ SingleItemToSingleRequestProtocol SUT = new SingleItemToSingleRequestProtocol(
+ mockReader,
+ mockWriter,
+ new HashedWheelTimer(),
+ TimeUnit.SECONDS.toMillis(1),
+ false
+ );
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
ChannelHandlerContext channelHandlerContext;
@@ -138,7 +151,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
void simpleRead() throws Exception {
// Given
// we have a simple read
- PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
// When
// we write this
SUT.write(channelHandlerContext, msg, channelPromise);
@@ -169,7 +182,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
void partialRead() throws Exception {
// Given
// we have a simple read
- PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
// When
// we write this
SUT.write(channelHandlerContext, msg, channelPromise);
@@ -202,7 +215,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
void partialReadOneErrored() throws Exception {
// Given
// we have a simple read
- PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
// When
// we write this
SUT.write(channelHandlerContext, msg, channelPromise);
@@ -241,7 +254,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
void noRead() throws Exception {
// Given
// we have a simple read
- PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
// When
// we write this
SUT.write(channelHandlerContext, msg, channelPromise);
@@ -314,7 +327,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
@Test
void read() throws Exception {
// Given
- PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(mockReader), responseCompletableFuture);
// When
SUT.write(channelHandlerContext, msg, channelPromise);
// Then
@@ -345,7 +358,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
@Test
void write() throws Exception {
// Given
- PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(), responseCompletableFuture);
+ PlcRequestContainer<?, ?> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(mockWriter), responseCompletableFuture);
// When
SUT.write(channelHandlerContext, msg, channelPromise);
// Then
@@ -395,48 +408,51 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
}
private static class TestDefaultPlcReadRequest extends DefaultPlcReadRequest {
-
- private TestDefaultPlcReadRequest(LinkedHashMap<String, PlcField> fields) {
- super(fields);
+ private TestDefaultPlcReadRequest(PlcReader reader, LinkedHashMap<String, PlcField> fields) {
+ super(reader, fields);
}
- private static TestDefaultPlcReadRequest build() {
+ private static TestDefaultPlcReadRequest build(PlcReader reader) {
LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
IntStream.rangeClosed(1, 5).forEach(i -> fields.put("readField" + i, mock(PlcField.class)));
- return new TestDefaultPlcReadRequest(fields);
+ return new TestDefaultPlcReadRequest(reader, fields);
}
}
private static class TestDefaultPlcWriteRequest extends DefaultPlcWriteRequest {
- private TestDefaultPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
- super(fields);
+ private TestDefaultPlcWriteRequest(PlcWriter writer, LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
+ super(writer, fields);
}
- private static TestDefaultPlcWriteRequest build() {
+ private static TestDefaultPlcWriteRequest build(PlcWriter writer) {
LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
IntStream.rangeClosed(1, 5).forEach(i -> fields.put("writeField" + i, Pair.of(mock(PlcField.class), mock(FieldItem.class))));
- return new TestDefaultPlcWriteRequest(fields);
+ return new TestDefaultPlcWriteRequest(writer, fields);
}
}
private static class TestDefaultPlcSubscriptionRequest extends DefaultPlcSubscriptionRequest {
- private static TestDefaultPlcSubscriptionRequest build() {
+ private TestDefaultPlcSubscriptionRequest(PlcSubscriber subscriber) {
+ super(subscriber);
+ }
+
+ private static TestDefaultPlcSubscriptionRequest build(PlcSubscriber subscriber) {
// TODO: implement me once available
- return new TestDefaultPlcSubscriptionRequest();
+ return new TestDefaultPlcSubscriptionRequest(subscriber);
}
}
private static class TestDefaultPlcUnsubscriptionRequest extends DefaultPlcUnsubscriptionRequest {
- private TestDefaultPlcUnsubscriptionRequest(Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles) {
- super(internalPlcSubscriptionHandles);
+ private TestDefaultPlcUnsubscriptionRequest(PlcSubscriber subscriber, Collection<? extends InternalPlcSubscriptionHandle> internalPlcSubscriptionHandles) {
+ super(subscriber, internalPlcSubscriptionHandles);
}
- private static TestDefaultPlcUnsubscriptionRequest build() {
+ private static TestDefaultPlcUnsubscriptionRequest build(PlcSubscriber subscriber) {
// TODO: implement me once available
- return new TestDefaultPlcUnsubscriptionRequest(Collections.emptyList());
+ return new TestDefaultPlcUnsubscriptionRequest(subscriber, Collections.emptyList());
}
}
}
\ No newline at end of file