You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/09/27 10:15:58 UTC
[incubator-plc4x] 07/07: [General]
SingleItemToSingleRequestProtocol fixed a bunch of bugs and added tests
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch feature/TopLevelItemSpliting
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 5951fc5e1ecc208363f653175c922b7ca2a3b53a
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Sep 27 12:13:30 2018 +0200
[General] SingleItemToSingleRequestProtocol fixed a bunch of bugs and added tests
---
.../SingleItemToSingleRequestProtocol.java | 125 ++++++---
.../SingleItemToSingleRequestProtocolTest.java | 290 +++++++++++++++++++++
pom.xml | 34 ++-
3 files changed, 399 insertions(+), 50 deletions(-)
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index fa578f2..724ffad 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -36,29 +36,50 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
/**
* This layer can be used to split a {@link org.apache.plc4x.java.api.messages.PlcRequest} which addresses multiple {@link PlcField}s into multiple subsequent {@link org.apache.plc4x.java.api.messages.PlcRequest}s.
*/
-// TODO: write test
public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
public static final Logger LOGGER = LoggerFactory.getLogger(SingleItemToSingleRequestProtocol.class);
private PendingWriteQueue queue;
- private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedRequestItems;
+ // Map to track send subcontainers
+ private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedSubContainer;
+ // Map to map tdpu to original parent container
+ private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> correlationToParentContainer;
+
+ // Map to track tdpus per container
private ConcurrentMap<PlcRequestContainer<?, ?>, Set<Integer>> containerCorrelationIdMap;
+ // Map to track a list of responses per parent container
private ConcurrentMap<PlcRequestContainer<?, ?>, List<InternalPlcResponse<?>>> responsesToBeDelivered;
private AtomicInteger correlationId;
+ public SingleItemToSingleRequestProtocol() {
+ this(true);
+ }
+
+ public SingleItemToSingleRequestProtocol(boolean betterImplementationPossible) {
+ if (betterImplementationPossible) {
+ String callStack = Arrays.stream(Thread.currentThread().getStackTrace())
+ .map(StackTraceElement::toString)
+ .collect(Collectors.joining("\n"));
+ LOGGER.warn("Unoptimized Usage of {} detected at:\n{}", this.getClass(), callStack);
+ LOGGER.info("Consider implementing item splitting native to the protocol.");
+ }
+ }
+
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
this.queue = new PendingWriteQueue(ctx);
- this.sentButUnacknowledgedRequestItems = new ConcurrentHashMap<>();
+ this.sentButUnacknowledgedSubContainer = new ConcurrentHashMap<>();
+ this.correlationToParentContainer = new ConcurrentHashMap<>();
this.containerCorrelationIdMap = new ConcurrentHashMap<>();
this.responsesToBeDelivered = new ConcurrentHashMap<>();
this.correlationId = new AtomicInteger();
@@ -68,6 +89,11 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
this.queue.removeAndWriteAll();
+ this.sentButUnacknowledgedSubContainer.clear();
+ this.correlationToParentContainer.clear();
+ this.containerCorrelationIdMap.clear();
+ this.responsesToBeDelivered.clear();
+ this.correlationId.set(0);
super.channelUnregistered(ctx);
}
@@ -82,20 +108,22 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
// Decoding
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- private void tryFinish(int correlationId, InternalPlcResponse msg) {
- PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId);
- if (plcRequestContainer == null) {
+ protected void tryFinish(Integer correlationId, InternalPlcResponse msg, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId);
+ LOGGER.info("{} got acknowledged", subPlcRequestContainer);
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(correlationId);
+ if (originalPlcRequestContainer == null) {
LOGGER.warn("Unrelated package received {}", msg);
return;
}
- List<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(plcRequestContainer, ignore -> new LinkedList<>());
+ List<InternalPlcResponse<?>> correlatedResponseItems = responsesToBeDelivered.computeIfAbsent(originalPlcRequestContainer, ignore -> new LinkedList<>());
correlatedResponseItems.add(msg);
- Set<Integer> integers = containerCorrelationIdMap.get(plcRequestContainer);
+ Set<Integer> integers = containerCorrelationIdMap.get(originalPlcRequestContainer);
integers.remove(correlationId);
if (integers.isEmpty()) {
InternalPlcResponse<?> plcResponse;
- if (plcRequestContainer.getRequest() instanceof InternalPlcReadRequest) {
- InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) plcRequestContainer.getRequest();
+ if (originalPlcRequestContainer.getRequest() instanceof InternalPlcReadRequest) {
+ InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) originalPlcRequestContainer.getRequest();
HashMap<String, Pair<PlcResponseCode, FieldItem>> fields = new HashMap<>();
correlatedResponseItems.stream()
@@ -104,8 +132,8 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
.forEach(stringPairMap -> stringPairMap.forEach(fields::put));
plcResponse = new DefaultPlcReadResponse(internalPlcReadRequest, fields);
- } else if (plcRequestContainer.getRequest() instanceof InternalPlcWriteRequest) {
- InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) plcRequestContainer.getRequest();
+ } else if (originalPlcRequestContainer.getRequest() instanceof InternalPlcWriteRequest) {
+ InternalPlcWriteRequest internalPlcWriteRequest = (InternalPlcWriteRequest) originalPlcRequestContainer.getRequest();
HashMap<String, PlcResponseCode> values = new HashMap<>();
correlatedResponseItems.stream()
@@ -115,20 +143,19 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
plcResponse = new DefaultPlcWriteResponse(internalPlcWriteRequest, values);
} else {
- throw new PlcRuntimeException("Unknown type detected " + plcRequestContainer.getRequest());
+ throw new PlcRuntimeException("Unknown type detected " + originalPlcRequestContainer.getRequest());
}
- plcRequestContainer.getResponseFuture().complete(plcResponse);
- responsesToBeDelivered.remove(plcRequestContainer);
+ responsesToBeDelivered.remove(originalPlcRequestContainer);
+ containerCorrelationIdMap.remove(originalPlcRequestContainer);
+ originalResponseFuture.complete(plcResponse);
}
}
- private void errored(int correlationId, Throwable throwable) {
- PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedRequestItems.remove(correlationId);
- if (plcRequestContainer == null) {
- LOGGER.warn("Unrelated error received ", throwable);
- return;
- }
- plcRequestContainer.getResponseFuture().completeExceptionally(throwable);
+ protected void errored(int correlationId, Throwable throwable, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId);
+ // TODO: cleanup missing maps as the complete response gets canceled now.
+ LOGGER.warn("PlcRequestContainer {} and correlationId {} failed ", plcRequestContainer, correlationId, throwable);
+ originalResponseFuture.completeExceptionally(throwable);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -153,17 +180,21 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
internalPlcReadRequest.getNamedFields().forEach(field -> {
ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
- int tdpu = correlationId.getAndIncrement();
- CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
+ Integer tdpu = correlationId.getAndIncrement();
+ CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>();
+ // Important: don't chain to above as we want the above to be completed not the result of when complete
+ correlatedCompletableFuture
.thenApply(InternalPlcResponse.class::cast)
.whenComplete((internalPlcResponse, throwable) -> {
if (throwable != null) {
- errored(tdpu, throwable);
+ errored(tdpu, throwable, in.getResponseFuture());
} else {
- tryFinish(tdpu, internalPlcResponse);
+ tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
}
});
- queue.add(new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture), subPromise);
+ PlcRequestContainer<CorrelatedPlcReadRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcReadRequest.of(field, tdpu), correlatedCompletableFuture);
+ correlationToParentContainer.put(tdpu, in);
+ queue.add(correlatedPlcRequestContainer, subPromise);
if (!tdpus.add(tdpu)) {
throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
}
@@ -175,17 +206,19 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> {
ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
- int tdpu = correlationId.getAndIncrement();
+ Integer tdpu = correlationId.getAndIncrement();
CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
.thenApply(InternalPlcResponse.class::cast)
.whenComplete((internalPlcResponse, throwable) -> {
if (throwable != null) {
- errored(tdpu, throwable);
+ errored(tdpu, throwable, in.getResponseFuture());
} else {
- tryFinish(tdpu, internalPlcResponse);
+ tryFinish(tdpu, internalPlcResponse, in.getResponseFuture());
}
});
- queue.add(new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture), subPromise);
+ PlcRequestContainer<CorrelatedPlcWriteRequest, InternalPlcResponse> correlatedPlcRequestContainer = new PlcRequestContainer<>(CorrelatedPlcWriteRequest.of(fieldItemTriple, tdpu), correlatedCompletableFuture);
+ correlationToParentContainer.put(tdpu, in);
+ queue.add(correlatedPlcRequestContainer, subPromise);
if (!tdpus.add(tdpu)) {
throw new IllegalStateException("AtomicInteger should not create duplicated ids: " + tdpu);
}
@@ -232,30 +265,30 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
if (request instanceof CorrelatedPlcRequest) {
CorrelatedPlcRequest correlatedPlcRequest = (CorrelatedPlcRequest) request;
- // Add it to the list of sentButUnacknowledgedRequestItems.
- sentButUnacknowledgedRequestItems.put(correlatedPlcRequest.getTdpu(), currentItem);
+ // Add it to the list of sentButUnacknowledgedSubContainer.
+ sentButUnacknowledgedSubContainer.put(correlatedPlcRequest.getTdpu(), currentItem);
- LOGGER.debug("Item Message with id {} sent", correlatedPlcRequest.getTdpu());
+ LOGGER.debug("container with id {} sent: ", correlatedPlcRequest.getTdpu(), currentItem);
}
}
ctx.flush();
}
- interface CorrelatedPlcRequest extends InternalPlcRequest {
+ protected interface CorrelatedPlcRequest extends InternalPlcRequest {
int getTdpu();
}
- private static class CorrelatedPlcReadRequest extends DefaultPlcReadRequest implements CorrelatedPlcRequest {
+ protected static class CorrelatedPlcReadRequest extends DefaultPlcReadRequest implements CorrelatedPlcRequest {
- private final int tdpu;
+ protected final int tdpu;
- public CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) {
+ protected CorrelatedPlcReadRequest(LinkedHashMap<String, PlcField> fields, int tdpu) {
super(fields);
this.tdpu = tdpu;
}
- public static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
+ protected static CorrelatedPlcReadRequest of(Pair<String, PlcField> stringPlcFieldPair, int tdpu) {
LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
fields.put(stringPlcFieldPair.getKey(), stringPlcFieldPair.getValue());
return new CorrelatedPlcReadRequest(fields, tdpu);
@@ -267,7 +300,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
}
}
- private static class CorrelatedPlcWriteRequest extends DefaultPlcWriteRequest implements CorrelatedPlcRequest {
+ protected static class CorrelatedPlcWriteRequest extends DefaultPlcWriteRequest implements CorrelatedPlcRequest {
private final int tdpu;
@@ -287,4 +320,16 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
return tdpu;
}
}
+
+ // TODO: maybe export to jmx
+ public Map<String, Integer> getStatistics() {
+ HashMap<String, Integer> statistics = new HashMap<>();
+ statistics.put("queue", queue.size());
+ statistics.put("sentButUnacknowledgedSubContainer", sentButUnacknowledgedSubContainer.size());
+ statistics.put("correlationToParentContainer", correlationToParentContainer.size());
+ statistics.put("containerCorrelationIdMap", containerCorrelationIdMap.size());
+ statistics.put("responsesToBeDelivered", responsesToBeDelivered.size());
+ statistics.put("currentCorrelationId", correlationId.get());
+ return statistics;
+ }
}
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
new file mode 100644
index 0000000..9a1c4ea
--- /dev/null
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -0,0 +1,290 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+
+package org.apache.plc4x.java.base.protocol;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.messages.PlcFieldRequest;
+import org.apache.plc4x.java.api.model.PlcField;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.*;
+import org.apache.plc4x.java.base.messages.items.FieldItem;
+import org.assertj.core.api.WithAssertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.*;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class SingleItemToSingleRequestProtocolTest implements WithAssertions {
+
+ @InjectMocks
+ SingleItemToSingleRequestProtocol SUT;
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ ChannelHandlerContext channelHandlerContext;
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ ChannelPromise channelPromise;
+
+ @Mock
+ CompletableFuture<InternalPlcResponse> responseCompletableFuture;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ SUT.channelRegistered(channelHandlerContext);
+ when(channelHandlerContext.executor().inEventLoop()).thenReturn(true);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ SUT.channelUnregistered(channelHandlerContext);
+ }
+
+ @Nested
+ class Misc {
+ @Test
+ void channelRegistered() throws Exception {
+ SUT.channelRegistered(channelHandlerContext);
+ assertThat(SUT.getStatistics()).containsOnly(
+ entry("queue", 0),
+ entry("sentButUnacknowledgedSubContainer", 0),
+ entry("correlationToParentContainer", 0),
+ entry("containerCorrelationIdMap", 0),
+ entry("responsesToBeDelivered", 0),
+ entry("currentCorrelationId", 0)
+ );
+ }
+
+ @Test
+ void channelUnregistered() throws Exception {
+ SUT.channelUnregistered(channelHandlerContext);
+ assertThat(SUT.getStatistics()).containsOnly(
+ entry("queue", 0),
+ entry("sentButUnacknowledgedSubContainer", 0),
+ entry("correlationToParentContainer", 0),
+ entry("containerCorrelationIdMap", 0),
+ entry("responsesToBeDelivered", 0),
+ entry("currentCorrelationId", 0)
+ );
+ }
+
+ @Test
+ void channelInactive() throws Exception {
+ SUT.channelInactive(channelHandlerContext);
+ assertThat(SUT.getStatistics()).containsOnly(
+ entry("queue", 0),
+ entry("sentButUnacknowledgedSubContainer", 0),
+ entry("correlationToParentContainer", 0),
+ entry("containerCorrelationIdMap", 0),
+ entry("responsesToBeDelivered", 0),
+ entry("currentCorrelationId", 0)
+ );
+ }
+ }
+
+ @Nested
+ class Roundtrip {
+ @Captor
+ ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor;
+
+ @Test
+ void simpleRead() throws Exception {
+ // Given
+ // we have a simple read
+ PlcRequestContainer<TestDefaultPlcReadRequest, InternalPlcResponse> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // And
+ // and we simulate that all get responded
+ verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+ List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+ capturedDownstreamContainers.forEach(plcRequestContainer -> {
+ InternalPlcReadRequest request = (InternalPlcReadRequest) plcRequestContainer.getRequest();
+ String fieldName = request.getFieldNames().iterator().next();
+ CompletableFuture responseFuture = plcRequestContainer.getResponseFuture();
+ HashMap<String, Pair<PlcResponseCode, FieldItem>> responseFields = new HashMap<>();
+ responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, mock(FieldItem.class)));
+ responseFuture.complete(new DefaultPlcReadResponse(request, responseFields));
+ });
+ // Then
+ // our complete container should complete normally
+ verify(responseCompletableFuture).complete(any());
+ // And we should have no memory leak
+ assertThat(SUT.getStatistics()).containsOnly(
+ entry("queue", 0),
+ entry("sentButUnacknowledgedSubContainer", 0),
+ entry("correlationToParentContainer", 0),
+ entry("containerCorrelationIdMap", 0),
+ entry("responsesToBeDelivered", 0),
+ entry("currentCorrelationId", 5)
+ );
+ }
+ }
+
+ @Nested
+ class Decoding {
+ @Test
+ void tryFinish() throws Exception {
+ SUT.tryFinish(1, null, new CompletableFuture<>());
+ // TODO: add Assertions.
+ }
+
+ @Test
+ void errored() throws Exception {
+ SUT.errored(1, mock(Throwable.class), new CompletableFuture<>());
+ // TODO: add Assertions.
+ }
+ }
+
+ @Nested
+ class Encoding {
+
+ @Captor
+ ArgumentCaptor<PlcRequestContainer> plcRequestContainerArgumentCaptor;
+
+ @Test
+ void empty() throws Exception {
+ // Given
+ Object msg = null;
+ // When
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // Then
+ verify(channelHandlerContext, times(1)).write(null, channelPromise);
+ }
+
+ @Test
+ void read() throws Exception {
+ // Given
+ PlcRequestContainer<TestDefaultPlcReadRequest, InternalPlcResponse> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+ // When
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // Then
+ verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+ List<PlcRequestContainer> capturedValues = plcRequestContainerArgumentCaptor.getAllValues();
+ // We check if every request as exactly one field
+ assertThat(capturedValues)
+ .allMatch(plcRequestContainer -> plcRequestContainer.getRequest() instanceof SingleItemToSingleRequestProtocol.CorrelatedPlcReadRequest)
+ .allMatch(plcRequestContainer -> ((SingleItemToSingleRequestProtocol.CorrelatedPlcReadRequest) plcRequestContainer.getRequest()).getNumberOfFields() == 1);
+ // In sum we should see all fields
+ List<String> fieldNamesList = capturedValues.stream()
+ .map(PlcRequestContainer::getRequest)
+ .map(PlcFieldRequest.class::cast)
+ .map(PlcFieldRequest::getFieldNames)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ // There should be no duplications
+ assertThat(fieldNamesList).hasSize(5);
+ assertThat(fieldNamesList).containsExactly(
+ "readField1",
+ "readField2",
+ "readField3",
+ "readField4",
+ "readField5"
+ );
+ }
+
+ @Test
+ void write() throws Exception {
+ // Given
+ PlcRequestContainer<TestDefaultPlcWriteRequest, InternalPlcResponse> msg = new PlcRequestContainer<>(TestDefaultPlcWriteRequest.build(), responseCompletableFuture);
+ // When
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // Then
+ verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+ List<PlcRequestContainer> capturedValues = plcRequestContainerArgumentCaptor.getAllValues();
+ // We check if every request as exactly one field
+ assertThat(capturedValues)
+ .allMatch(plcRequestContainer -> plcRequestContainer.getRequest() instanceof SingleItemToSingleRequestProtocol.CorrelatedPlcWriteRequest)
+ .allMatch(plcRequestContainer -> ((SingleItemToSingleRequestProtocol.CorrelatedPlcWriteRequest) plcRequestContainer.getRequest()).getNumberOfFields() == 1);
+ // In sum we should see all fields
+ List<String> fieldNamesList = capturedValues.stream()
+ .map(PlcRequestContainer::getRequest)
+ .map(PlcFieldRequest.class::cast)
+ .map(PlcFieldRequest::getFieldNames)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ // There should be no duplications
+ assertThat(fieldNamesList).hasSize(5);
+ assertThat(fieldNamesList).containsExactly(
+ "writeField1",
+ "writeField2",
+ "writeField3",
+ "writeField4",
+ "writeField5"
+ );
+ }
+
+ @Test
+ void trySendingMessages() throws Exception {
+ SUT.trySendingMessages(channelHandlerContext);
+ // TODO: add assertions
+ }
+ }
+
+ private static class TestDefaultPlcReadRequest extends DefaultPlcReadRequest {
+
+ private TestDefaultPlcReadRequest(LinkedHashMap<String, PlcField> fields) {
+ super(fields);
+ }
+
+ private static TestDefaultPlcReadRequest build() {
+ LinkedHashMap<String, PlcField> fields = new LinkedHashMap<>();
+ fields.put("readField1", mock(PlcField.class));
+ fields.put("readField2", mock(PlcField.class));
+ fields.put("readField3", mock(PlcField.class));
+ fields.put("readField4", mock(PlcField.class));
+ fields.put("readField5", mock(PlcField.class));
+ return new TestDefaultPlcReadRequest(fields);
+ }
+ }
+
+ private static class TestDefaultPlcWriteRequest extends DefaultPlcWriteRequest {
+
+ private TestDefaultPlcWriteRequest(LinkedHashMap<String, Pair<PlcField, FieldItem>> fields) {
+ super(fields);
+ }
+
+ private static TestDefaultPlcWriteRequest build() {
+ LinkedHashMap<String, Pair<PlcField, FieldItem>> fields = new LinkedHashMap<>();
+ fields.put("writeField1", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
+ fields.put("writeField2", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
+ fields.put("writeField3", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
+ fields.put("writeField4", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
+ fields.put("writeField5", Pair.of(mock(PlcField.class), mock(FieldItem.class)));
+ return new TestDefaultPlcWriteRequest(fields);
+ }
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index b4b2867..d1ad775 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,7 +17,8 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -301,7 +302,7 @@
</goals>
<configuration>
<rules>
- <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" />
+ <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
</rules>
</configuration>
</execution>
@@ -490,7 +491,9 @@
</goals>
<configuration>
<url>https://stackpath.bootstrapcdn.com/bootswatch/${bootstrap.version}/flatly/bootstrap.min.css</url>
- <outputDirectory>${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}/css/</outputDirectory>
+ <outputDirectory>
+ ${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}/css/
+ </outputDirectory>
<outputFileName>bootstrap.min.css</outputFileName>
</configuration>
</execution>
@@ -501,7 +504,9 @@
<goal>wget</goal>
</goals>
<configuration>
- <url>https://use.fontawesome.com/releases/v${fontawesome.version}/fontawesome-free-${fontawesome.version}-web.zip</url>
+ <url>
+ https://use.fontawesome.com/releases/v${fontawesome.version}/fontawesome-free-${fontawesome.version}-web.zip
+ </url>
<unpack>true</unpack>
<outputDirectory>${project.build.directory}/dependency/fontawesome</outputDirectory>
</configuration>
@@ -523,14 +528,18 @@
<outputDirectory>${project.build.directory}/site</outputDirectory>
<resources>
<resource>
- <directory>${project.build.directory}/dependency/META-INF/resources/webjars/anchorjs/${anchorjs.version}</directory>
+ <directory>
+ ${project.build.directory}/dependency/META-INF/resources/webjars/anchorjs/${anchorjs.version}
+ </directory>
<targetPath>${project.build.directory}/site/js</targetPath>
<includes>
<include>anchor.min.js</include>
</includes>
</resource>
<resource>
- <directory>${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}</directory>
+ <directory>
+ ${project.build.directory}/dependency/META-INF/resources/webjars/bootstrap/${bootstrap.version}
+ </directory>
<includes>
<include>css/bootstrap.min.css</include>
<include>js/bootstrap.min.js</include>
@@ -538,17 +547,22 @@
</includes>
</resource>
<resource>
- <directory>${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web</directory>
+ <directory>
+ ${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web
+ </directory>
<includes>
<include>css/all.min.css</include>
</includes>
</resource>
<resource>
- <directory>${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web/webfonts</directory>
+ <directory>
+ ${project.build.directory}/dependency/fontawesome/fontawesome-free-${fontawesome.version}-web/webfonts
+ </directory>
<targetPath>${project.build.directory}/site/fonts</targetPath>
</resource>
<resource>
- <directory>${project.build.directory}/dependency/META-INF/resources/webjars/jquery/${jquery.version}</directory>
+ <directory>${project.build.directory}/dependency/META-INF/resources/webjars/jquery/${jquery.version}
+ </directory>
<targetPath>${project.build.directory}/site/js</targetPath>
<includes>
<include>jquery.min.js</include>
@@ -860,7 +874,7 @@
</goals>
</pluginExecutionFilter>
<action>
- <ignore />
+ <ignore/>
</action>
</pluginExecution>
</pluginExecutions>