You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by ld...@apache.org on 2020/11/08 00:50:59 UTC

[plc4x] branch issue/PLC4X-257 created (now 2bb4625)

This is an automated email from the ASF dual-hosted git repository.

ldywicki pushed a change to branch issue/PLC4X-257
in repository https://gitbox.apache.org/repos/asf/plc4x.git.


      at 2bb4625  PLC4X-257 Swap anonymous lambdas which static classes.

This branch includes the following new commits:

     new 35853c1  PLC4X-257 Make sure onTimeout is called for missed answers.
     new 2bb4625  PLC4X-257 Swap anonymous lambdas which static classes.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[plc4x] 02/02: PLC4X-257 Swap anonymous lambdas which static classes.

Posted by ld...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ldywicki pushed a commit to branch issue/PLC4X-257
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 2bb462549ea2ae660adce2906866c50e7270a499
Author: Łukasz Dywicki <lu...@code-house.org>
AuthorDate: Sun Nov 8 01:23:00 2020 +0100

    PLC4X-257 Swap anonymous lambdas which static classes.
    
    Lambdas which are present on stack trace have a big impact on debugging capabilities.
    They do not point place in code which is faulty, instead they represent kind of named state for code which formed it.
    In order to improve overall developer experience swap to regular classes gives us a possiblity to find which `only` call refused message from further processing.
---
 .../apache/plc4x/java/spi/Plc4xNettyWrapper.java   |  4 +-
 .../spi/internal/DefaultSendRequestContext.java    | 46 +++++++++++++++++++---
 2 files changed, 42 insertions(+), 8 deletions(-)

diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index c199eca..5929ffa 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -160,7 +160,7 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
                         Predicate predicate = either.get();
                         if (predicate.test(instance) == false) {
                             // We do not match -> cannot handle
-                            logger.trace("Registration {} does not match object {} (currently wrapped to {})", registration, t.getClass().getSimpleName(), instance.getClass().getSimpleName());
+                            logger.trace("Registration {} with predicate {} does not match object {} (currently wrapped to {})", registration, predicate, t.getClass().getSimpleName(), instance.getClass().getSimpleName());
                             continue registrations;
                         }
                     }
@@ -174,7 +174,7 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
                 return;
             }
         }
-        logger.trace("No registered handler found for message {}, using default decode method", t);
+        logger.trace("None of {} registered handlers could handle message {}, using default decode method", this.registeredHandlers.size(), t);
         protocolBase.decode(new DefaultConversationContext<>(channelHandlerContext, passive), t);
     }
 
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
index 376c0fd..51141a6 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContext.java
@@ -77,7 +77,7 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
             throw new ConversationContext.PlcWiringException("can't expect class of type " + clazz + " as we already expecting clazz of type " + expectClazz);
         }
         expectClazz = clazz;
-        commands.addLast(Either.right(clazz::isInstance));
+        commands.addLast(Either.right(new TypePredicate<>(clazz)));
         return this;
     }
 
@@ -123,9 +123,7 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
             throw new ConversationContext.PlcWiringException("expectResponse must be called before first unwrap");
         }
         if (onTimeoutConsumer == null) {
-            onTimeoutConsumer = e -> {
-                // NOOP
-            };
+            onTimeoutConsumer = new NoopTimeoutConsumer();
         }
         commands.addLast(Either.left(unwrapper));
         return new DefaultSendRequestContext<>(commands, timeout, finisher, request, context, expectClazz, packetConsumer, onTimeoutConsumer, errorConsumer);
@@ -133,8 +131,44 @@ public class DefaultSendRequestContext<T> implements ConversationContext.SendReq
 
     @Override
     public <R> ConversationContext.SendRequestContext<R> only(Class<R> clazz) {
-        this.check(clazz::isInstance);
-        return this.unwrap(clazz::cast);
+        this.check(new TypePredicate<>(clazz));
+        return this.unwrap(new CastFunction<>(clazz));
     }
 
+    static class TypePredicate<T, R> implements Predicate<R> {
+
+        private final Class<T> type;
+
+        TypePredicate(Class<T> type) {
+            this.type = type;
+        }
+
+        @Override
+        public boolean test(R value) {
+            return type.isInstance(value);
+        }
+    }
+
+    static class CastFunction<T, R> implements Function<R, T> {
+
+        private final Class<T> type;
+
+        CastFunction(Class<T> type) {
+            this.type = type;
+        }
+
+        @Override
+        public T apply(R value) {
+            return type.cast(value);
+        }
+
+    }
+
+    static class NoopTimeoutConsumer implements Consumer<TimeoutException> {
+
+        @Override
+        public void accept(TimeoutException e) {
+
+        }
+    }
 }


[plc4x] 01/02: PLC4X-257 Make sure onTimeout is called for missed answers.

Posted by ld...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ldywicki pushed a commit to branch issue/PLC4X-257
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 35853c195b7305ae86b7d618844ccf17d6087f9d
Author: Łukasz Dywicki <lu...@code-house.org>
AuthorDate: Sun Nov 8 01:17:16 2020 +0100

    PLC4X-257 Make sure onTimeout is called for missed answers.
---
 .../apache/plc4x/java/spi/Plc4xNettyWrapper.java   |   3 +
 .../plc4x/java/spi/Plc4xNettyWrapperTest.java      | 103 +++++++++++++++++++++
 2 files changed, 106 insertions(+)

diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index 6955d39..c199eca 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -42,6 +42,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -139,6 +140,8 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
             // Timeout?
             if (registration.getTimeout().isBefore(Instant.now())) {
                 logger.debug("Removing {} as its timed out (was set till {})", registration, registration.getTimeout());
+                // pass timeout back to caller so it can do ie. transaction compensation
+                registration.getOnTimeoutConsumer().accept(new TimeoutException());
                 iter.remove();
                 continue;
             }
diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
new file mode 100644
index 0000000..bc62d9e
--- /dev/null
+++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.spi;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import org.apache.plc4x.java.spi.events.ConnectEvent;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class Plc4xNettyWrapperTest {
+
+    @Mock
+    Plc4xProtocolBase<Date> protocol;
+    @Mock
+    ChannelPipeline channelPipeline;
+    @Mock
+    ChannelHandlerContext channelHandlerContext;
+    @Mock
+    Channel channel;
+
+    Plc4xNettyWrapper<Date> wrapper;
+
+    ConversationContext<Date> conversationContext;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        wrapper = new Plc4xNettyWrapper<>(channelPipeline, false, protocol, Date.class);
+
+        ArgumentCaptor<ConversationContext<Date>> captor = ArgumentCaptor.forClass(ConversationContext.class);
+        doNothing().when(protocol).onConnect(captor.capture());
+
+        when(channelHandlerContext.channel()).thenReturn(channel);
+
+        wrapper.userEventTriggered(channelHandlerContext, new ConnectEvent());
+        conversationContext = captor.getValue();
+    }
+
+    @Test // see PLC4X-257
+    void conversationTimeoutTest() throws Exception {
+        AtomicBoolean timeout = new AtomicBoolean(false);
+        AtomicBoolean handled = new AtomicBoolean(false);
+        AtomicBoolean error = new AtomicBoolean(false);
+
+        ConversationContext.ContextHandler handler = conversationContext.sendRequest(new Date())
+            .expectResponse(Date.class, Duration.ofMillis(500))
+            .onTimeout(e -> {
+                timeout.set(true);
+            })
+            .onError((value, throwable) -> {
+                error.set(true);
+            })
+            .handle((answer) -> {
+                handled.set(true);
+            });
+
+        Thread.sleep(750);
+        assertFalse(timeout.get(), "timeout");
+        assertFalse(handled.get(), "handled");
+        assertFalse(error.get(), "error");
+
+        wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>());
+
+        assertTrue(timeout.get());
+
+        assertTrue(timeout.get(), "timeout");
+        assertFalse(handled.get(), "handled");
+        assertFalse(error.get(), "error");
+
+    }
+}
\ No newline at end of file