You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by ld...@apache.org on 2020/11/12 00:35:09 UTC

[plc4x] 02/02: PLC4X-207 Make sure onTimeout is called for missed answers.

This is an automated email from the ASF dual-hosted git repository.

ldywicki pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 2adf651c6c3deca0a03b702ad5635bbbeb6405b5
Author: Ɓukasz Dywicki <lu...@code-house.org>
AuthorDate: Sun Nov 8 01:17:16 2020 +0100

    PLC4X-207 Make sure onTimeout is called for missed answers.
---
 .../apache/plc4x/java/spi/Plc4xNettyWrapper.java   |   3 +
 .../plc4x/java/spi/Plc4xNettyWrapperTest.java      | 103 +++++++++++++++++++++
 2 files changed, 106 insertions(+)

diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index dba2566..b459559 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -42,6 +42,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -139,6 +140,8 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
             // Timeout?
             if (registration.getTimeout().isBefore(Instant.now())) {
                 logger.debug("Removing {} as its timed out (was set till {})", registration, registration.getTimeout());
+                // pass timeout back to caller so it can do ie. transaction compensation
+                registration.getOnTimeoutConsumer().accept(new TimeoutException());
                 iter.remove();
                 continue;
             }
diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
new file mode 100644
index 0000000..7181f2c
--- /dev/null
+++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.spi;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import org.apache.plc4x.java.spi.events.ConnectEvent;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class Plc4xNettyWrapperTest {
+
+    @Mock
+    Plc4xProtocolBase<Date> protocol;
+    @Mock
+    ChannelPipeline channelPipeline;
+    @Mock
+    ChannelHandlerContext channelHandlerContext;
+    @Mock
+    Channel channel;
+
+    Plc4xNettyWrapper<Date> wrapper;
+
+    ConversationContext<Date> conversationContext;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        wrapper = new Plc4xNettyWrapper<>(channelPipeline, false, protocol, Date.class);
+
+        ArgumentCaptor<ConversationContext<Date>> captor = ArgumentCaptor.forClass(ConversationContext.class);
+        doNothing().when(protocol).onConnect(captor.capture());
+
+        when(channelHandlerContext.channel()).thenReturn(channel);
+
+        wrapper.userEventTriggered(channelHandlerContext, new ConnectEvent());
+        conversationContext = captor.getValue();
+    }
+
+    @Test // see PLC4X-207 / PLC4X-257
+    void conversationTimeoutTest() throws Exception {
+        AtomicBoolean timeout = new AtomicBoolean(false);
+        AtomicBoolean handled = new AtomicBoolean(false);
+        AtomicBoolean error = new AtomicBoolean(false);
+
+        ConversationContext.ContextHandler handler = conversationContext.sendRequest(new Date())
+            .expectResponse(Date.class, Duration.ofMillis(500))
+            .onTimeout(e -> {
+                timeout.set(true);
+            })
+            .onError((value, throwable) -> {
+                error.set(true);
+            })
+            .handle((answer) -> {
+                handled.set(true);
+            });
+
+        Thread.sleep(750);
+        assertFalse(timeout.get(), "timeout");
+        assertFalse(handled.get(), "handled");
+        assertFalse(error.get(), "error");
+
+        wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>());
+
+        assertTrue(timeout.get());
+
+        assertTrue(timeout.get(), "timeout");
+        assertFalse(handled.get(), "handled");
+        assertFalse(error.get(), "error");
+
+    }
+}
\ No newline at end of file