You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/09/28 08:38:42 UTC
[incubator-plc4x] branch master updated: [General]
SingleItemToSingleRequestProtocol fixed potential memory leak by applying
receive timeouts
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
The following commit(s) were added to refs/heads/master by this push:
new 441bcde [General] SingleItemToSingleRequestProtocol fixed potential memory leak by applying receive timeouts
441bcde is described below
commit 441bcde36196a3dd61296dec513f81867ecf6dc6
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Sep 28 10:38:34 2018 +0200
[General] SingleItemToSingleRequestProtocol fixed potential memory leak
by applying receive timeouts
---
.../java/api/exceptions/PlcTimeoutException.java | 46 ++++++++
.../SingleItemToSingleRequestProtocol.java | 121 +++++++++++++++++++--
.../SingleItemToSingleRequestProtocolTest.java | 80 +++++++++++---
3 files changed, 222 insertions(+), 25 deletions(-)
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/exceptions/PlcTimeoutException.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/exceptions/PlcTimeoutException.java
new file mode 100644
index 0000000..ddba534
--- /dev/null
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/exceptions/PlcTimeoutException.java
@@ -0,0 +1,46 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+
+package org.apache.plc4x.java.api.exceptions;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Can be thrown when something times out.
+ */
+public class PlcTimeoutException extends PlcRuntimeException {
+ private final long timeout;
+
+ /**
+ * Indicates something timed out.
+ *
+ * @param timeout in nanoseconds.
+ */
+ public PlcTimeoutException(long timeout) {
+ super("Timeout reached after " + TimeUnit.NANOSECONDS.toMillis(timeout) + "ms");
+ this.timeout = timeout;
+ }
+
+ /**
+ * @return the timeout in nanoseconds.
+ */
+ public long getTimeout() {
+ return timeout;
+ }
+}
diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
index 724ffad..bdd1a7f 100644
--- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
+++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocol.java
@@ -19,11 +19,15 @@
package org.apache.plc4x.java.base.protocol;
import io.netty.channel.*;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.PromiseCombiner;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.exceptions.PlcTimeoutException;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.base.messages.*;
@@ -35,7 +39,9 @@ import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
@@ -47,6 +53,13 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
private PendingWriteQueue queue;
+ private Timer timer;
+
+ // TODO: maybe better get from map
+ private long defaultReceiveTimeout;
+
+ private ConcurrentMap<PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>, Timeout> scheduledTimeouts;
+
// Map to track send subcontainers
private ConcurrentMap<Integer, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>> sentButUnacknowledgedSubContainer;
@@ -59,13 +72,27 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
// Map to track a list of responses per parent container
private ConcurrentMap<PlcRequestContainer<?, ?>, List<InternalPlcResponse<?>>> responsesToBeDelivered;
- private AtomicInteger correlationId;
+ private AtomicInteger correlationIdGenerator;
+
+ // TODO: maybe put in map per day or per hour
+ private AtomicLong deliveredContainers;
+
+ private AtomicLong erroredContainers;
+
+ private AtomicLong deliveredItems;
+
+ private AtomicLong erroredItems;
public SingleItemToSingleRequestProtocol() {
this(true);
}
public SingleItemToSingleRequestProtocol(boolean betterImplementationPossible) {
+ this(TimeUnit.SECONDS.toMillis(30), betterImplementationPossible);
+ }
+
+ public SingleItemToSingleRequestProtocol(long defaultReceiveTimeout, boolean betterImplementationPossible) {
+ this.defaultReceiveTimeout = defaultReceiveTimeout;
if (betterImplementationPossible) {
String callStack = Arrays.stream(Thread.currentThread().getStackTrace())
.map(StackTraceElement::toString)
@@ -78,22 +105,34 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
this.queue = new PendingWriteQueue(ctx);
+ this.timer = new HashedWheelTimer();
+ this.scheduledTimeouts = new ConcurrentHashMap<>();
this.sentButUnacknowledgedSubContainer = new ConcurrentHashMap<>();
this.correlationToParentContainer = new ConcurrentHashMap<>();
this.containerCorrelationIdMap = new ConcurrentHashMap<>();
this.responsesToBeDelivered = new ConcurrentHashMap<>();
- this.correlationId = new AtomicInteger();
+ this.correlationIdGenerator = new AtomicInteger();
+ this.deliveredItems = new AtomicLong();
+ this.erroredItems = new AtomicLong();
+ this.deliveredContainers = new AtomicLong();
+ this.erroredContainers = new AtomicLong();
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
this.queue.removeAndWriteAll();
+ this.timer.stop();
+ this.scheduledTimeouts.clear();
this.sentButUnacknowledgedSubContainer.clear();
this.correlationToParentContainer.clear();
this.containerCorrelationIdMap.clear();
this.responsesToBeDelivered.clear();
- this.correlationId.set(0);
+ this.correlationIdGenerator.set(0);
+ this.deliveredItems.set(0);
+ this.erroredItems.set(0);
+ this.deliveredContainers.set(0);
+ this.erroredContainers.set(0);
super.channelUnregistered(ctx);
}
@@ -101,6 +140,17 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Send everything so we get a proper failure for those pending writes
this.queue.removeAndWriteAll();
+ this.timer.stop();
+ this.scheduledTimeouts.clear();
+ this.sentButUnacknowledgedSubContainer.clear();
+ this.correlationToParentContainer.clear();
+ this.containerCorrelationIdMap.clear();
+ this.responsesToBeDelivered.clear();
+ this.correlationIdGenerator.set(0);
+ this.deliveredItems.set(0);
+ this.erroredItems.set(0);
+ this.deliveredContainers.set(0);
+ this.erroredContainers.set(0);
super.channelInactive(ctx);
}
@@ -109,6 +159,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
protected void tryFinish(Integer correlationId, InternalPlcResponse msg, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
+ deliveredItems.incrementAndGet();
PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId);
LOGGER.info("{} got acknowledged", subPlcRequestContainer);
PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(correlationId);
@@ -121,6 +172,12 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
Set<Integer> integers = containerCorrelationIdMap.get(originalPlcRequestContainer);
integers.remove(correlationId);
if (integers.isEmpty()) {
+ deliveredContainers.incrementAndGet();
+ Timeout timeout = scheduledTimeouts.remove(originalPlcRequestContainer);
+ if (timeout != null) {
+ timeout.cancel();
+ }
+
InternalPlcResponse<?> plcResponse;
if (originalPlcRequestContainer.getRequest() instanceof InternalPlcReadRequest) {
InternalPlcReadRequest internalPlcReadRequest = (InternalPlcReadRequest) originalPlcRequestContainer.getRequest();
@@ -152,10 +209,25 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
}
protected void errored(int correlationId, Throwable throwable, CompletableFuture<InternalPlcResponse<?>> originalResponseFuture) {
- PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> plcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId);
- // TODO: cleanup missing maps as the complete response gets canceled now.
- LOGGER.warn("PlcRequestContainer {} and correlationId {} failed ", plcRequestContainer, correlationId, throwable);
- originalResponseFuture.completeExceptionally(throwable);
+ erroredItems.incrementAndGet();
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> subPlcRequestContainer = sentButUnacknowledgedSubContainer.remove(correlationId);
+ LOGGER.info("{} got errored", subPlcRequestContainer);
+
+
+ PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> originalPlcRequestContainer = correlationToParentContainer.remove(correlationId);
+ if (originalPlcRequestContainer == null) {
+ LOGGER.warn("Unrelated error received correlationId:{}", correlationId, throwable);
+ } else {
+ erroredContainers.incrementAndGet();
+ Timeout timeout = scheduledTimeouts.remove(originalPlcRequestContainer);
+ if (timeout != null) {
+ timeout.cancel();
+ }
+ responsesToBeDelivered.remove(originalPlcRequestContainer);
+ containerCorrelationIdMap.remove(originalPlcRequestContainer);
+ LOGGER.warn("PlcRequestContainer {} and correlationId {} failed ", correlationToParentContainer, correlationId, throwable);
+ originalResponseFuture.completeExceptionally(throwable);
+ }
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -169,6 +241,9 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in = (PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>>) msg;
Set<Integer> tdpus = containerCorrelationIdMap.computeIfAbsent(in, plcRequestContainer -> new HashSet<>());
+ Timeout timeout = timer.newTimeout(timeout_ -> handleTimeout(timeout_, in, tdpus, System.nanoTime()), defaultReceiveTimeout, TimeUnit.MILLISECONDS);
+ scheduledTimeouts.put(in, timeout);
+
// Create a promise that has to be called multiple times.
PromiseCombiner promiseCombiner = new PromiseCombiner();
InternalPlcRequest request = in.getRequest();
@@ -180,7 +255,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
internalPlcReadRequest.getNamedFields().forEach(field -> {
ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
- Integer tdpu = correlationId.getAndIncrement();
+ Integer tdpu = correlationIdGenerator.getAndIncrement();
CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>();
// Important: don't chain to above as we want the above to be completed not the result of when complete
correlatedCompletableFuture
@@ -206,7 +281,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
internalPlcWriteRequest.getNamedFieldTriples().forEach(fieldItemTriple -> {
ChannelPromise subPromise = new DefaultChannelPromise(promise.channel());
- Integer tdpu = correlationId.getAndIncrement();
+ Integer tdpu = correlationIdGenerator.getAndIncrement();
CompletableFuture<InternalPlcResponse> correlatedCompletableFuture = new CompletableFuture<>()
.thenApply(InternalPlcResponse.class::cast)
.whenComplete((internalPlcResponse, throwable) -> {
@@ -244,6 +319,7 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
// Helpers
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ @SuppressWarnings("unchecked")
protected synchronized void trySendingMessages(ChannelHandlerContext ctx) {
while (queue.size() > 0) {
// Get the RequestItem that is up next in the queue.
@@ -274,6 +350,23 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
ctx.flush();
}
+ private void handleTimeout(Timeout timeout, PlcRequestContainer<InternalPlcRequest, InternalPlcResponse<?>> in, Set<Integer> tdpus, long scheduledAt) {
+ if (timeout.isCancelled()) {
+ LOGGER.debug("container {} with timeout {} got canceled", in, timeout);
+ return;
+ }
+ LOGGER.warn("container {} timed out:{}", in, timeout);
+ erroredContainers.incrementAndGet();
+ responsesToBeDelivered.remove(in);
+ containerCorrelationIdMap.remove(in);
+ tdpus.forEach(tdpu -> {
+ erroredItems.incrementAndGet();
+ sentButUnacknowledgedSubContainer.remove(tdpu);
+ correlationToParentContainer.remove(tdpu);
+ });
+ in.getResponseFuture().completeExceptionally(new PlcTimeoutException(System.nanoTime() - scheduledAt));
+ }
+
protected interface CorrelatedPlcRequest extends InternalPlcRequest {
int getTdpu();
@@ -322,14 +415,18 @@ public class SingleItemToSingleRequestProtocol extends ChannelDuplexHandler {
}
// TODO: maybe export to jmx
- public Map<String, Integer> getStatistics() {
- HashMap<String, Integer> statistics = new HashMap<>();
+ public Map<String, Number> getStatistics() {
+ HashMap<String, Number> statistics = new HashMap<>();
statistics.put("queue", queue.size());
statistics.put("sentButUnacknowledgedSubContainer", sentButUnacknowledgedSubContainer.size());
statistics.put("correlationToParentContainer", correlationToParentContainer.size());
statistics.put("containerCorrelationIdMap", containerCorrelationIdMap.size());
statistics.put("responsesToBeDelivered", responsesToBeDelivered.size());
- statistics.put("currentCorrelationId", correlationId.get());
+ statistics.put("correlationIdGenerator", correlationIdGenerator.get());
+ statistics.put("deliveredItems", deliveredItems.get());
+ statistics.put("erroredItems", erroredItems.get());
+ statistics.put("deliveredContainers", deliveredContainers.get());
+ statistics.put("erroredContainers", erroredContainers.get());
return statistics;
}
}
diff --git a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
index 9a1c4ea..1185ace 100644
--- a/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
+++ b/plc4j/protocols/driver-bases/base/src/test/java/org/apache/plc4x/java/base/protocol/SingleItemToSingleRequestProtocolTest.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.mockito.ArgumentMatchers.any;
@@ -50,7 +51,7 @@ import static org.mockito.Mockito.*;
class SingleItemToSingleRequestProtocolTest implements WithAssertions {
@InjectMocks
- SingleItemToSingleRequestProtocol SUT;
+ SingleItemToSingleRequestProtocol SUT = new SingleItemToSingleRequestProtocol(TimeUnit.SECONDS.toMillis(1), false);
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
ChannelHandlerContext channelHandlerContext;
@@ -83,7 +84,11 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
entry("correlationToParentContainer", 0),
entry("containerCorrelationIdMap", 0),
entry("responsesToBeDelivered", 0),
- entry("currentCorrelationId", 0)
+ entry("correlationIdGenerator", 0),
+ entry("deliveredItems", 0L),
+ entry("erroredItems", 0L),
+ entry("erroredContainers", 0L),
+ entry("deliveredContainers", 0L)
);
}
@@ -96,7 +101,11 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
entry("correlationToParentContainer", 0),
entry("containerCorrelationIdMap", 0),
entry("responsesToBeDelivered", 0),
- entry("currentCorrelationId", 0)
+ entry("correlationIdGenerator", 0),
+ entry("deliveredItems", 0L),
+ entry("erroredItems", 0L),
+ entry("deliveredContainers", 0L),
+ entry("erroredContainers", 0L)
);
}
@@ -109,7 +118,11 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
entry("correlationToParentContainer", 0),
entry("containerCorrelationIdMap", 0),
entry("responsesToBeDelivered", 0),
- entry("currentCorrelationId", 0)
+ entry("correlationIdGenerator", 0),
+ entry("deliveredItems", 0L),
+ entry("erroredItems", 0L),
+ entry("deliveredContainers", 0L),
+ entry("erroredContainers", 0L)
);
}
}
@@ -131,14 +144,7 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
// and we simulate that all get responded
verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
- capturedDownstreamContainers.forEach(plcRequestContainer -> {
- InternalPlcReadRequest request = (InternalPlcReadRequest) plcRequestContainer.getRequest();
- String fieldName = request.getFieldNames().iterator().next();
- CompletableFuture responseFuture = plcRequestContainer.getResponseFuture();
- HashMap<String, Pair<PlcResponseCode, FieldItem>> responseFields = new HashMap<>();
- responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, mock(FieldItem.class)));
- responseFuture.complete(new DefaultPlcReadResponse(request, responseFields));
- });
+ capturedDownstreamContainers.forEach(this::produceReadResponse);
// Then
// our complete container should complete normally
verify(responseCompletableFuture).complete(any());
@@ -149,9 +155,57 @@ class SingleItemToSingleRequestProtocolTest implements WithAssertions {
entry("correlationToParentContainer", 0),
entry("containerCorrelationIdMap", 0),
entry("responsesToBeDelivered", 0),
- entry("currentCorrelationId", 5)
+ entry("correlationIdGenerator", 5),
+ entry("erroredItems", 0L),
+ entry("deliveredItems", 5L),
+ entry("deliveredContainers", 1L),
+ entry("erroredContainers", 0L)
);
}
+
+ @Test
+ void partialRead() throws Exception {
+ // Given
+ // we have a simple read
+ PlcRequestContainer<TestDefaultPlcReadRequest, InternalPlcResponse> msg = new PlcRequestContainer<>(TestDefaultPlcReadRequest.build(), responseCompletableFuture);
+ // When
+ // we write this
+ SUT.write(channelHandlerContext, msg, channelPromise);
+ // And
+ // and we simulate that some one responded
+ verify(channelHandlerContext, times(5)).write(plcRequestContainerArgumentCaptor.capture(), any());
+ List<PlcRequestContainer> capturedDownstreamContainers = plcRequestContainerArgumentCaptor.getAllValues();
+ capturedDownstreamContainers.stream().findFirst().map(this::produceReadResponse);
+ // Then
+ // We create SUT with 1 seconds timeout
+ TimeUnit.SECONDS.sleep(2);
+ // our complete container should complete normally
+ verify(responseCompletableFuture).completeExceptionally(any());
+ // And we should have no memory leak
+ assertThat(SUT.getStatistics()).containsOnly(
+ entry("queue", 0),
+ entry("sentButUnacknowledgedSubContainer", 0),
+ entry("correlationToParentContainer", 0),
+ entry("containerCorrelationIdMap", 0),
+ entry("responsesToBeDelivered", 0),
+ entry("correlationIdGenerator", 5),
+ entry("deliveredItems", 1L),
+ entry("erroredItems", 4L),
+ entry("deliveredContainers", 0L),
+ entry("erroredContainers", 1L)
+ );
+ }
+
+ @SuppressWarnings("unchecked")
+ private Void produceReadResponse(PlcRequestContainer plcRequestContainer) {
+ InternalPlcReadRequest request = (InternalPlcReadRequest) plcRequestContainer.getRequest();
+ String fieldName = request.getFieldNames().iterator().next();
+ CompletableFuture responseFuture = plcRequestContainer.getResponseFuture();
+ HashMap<String, Pair<PlcResponseCode, FieldItem>> responseFields = new HashMap<>();
+ responseFields.put(fieldName, Pair.of(PlcResponseCode.OK, mock(FieldItem.class)));
+ responseFuture.complete(new DefaultPlcReadResponse(request, responseFields));
+ return null;
+ }
}
@Nested