You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/30 11:31:49 UTC

[camel] branch master updated: CAMEL-15718: Camel lumberjack server component not thread safe (#4540)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 68c454b  CAMEL-15718: Camel lumberjack server component not thread safe (#4540)
68c454b is described below

commit 68c454b023ea0bf5f342ec0cca8b57ba71490924
Author: Zineb BENDHIBA <be...@gmail.com>
AuthorDate: Fri Oct 30 12:31:23 2020 +0100

    CAMEL-15718: Camel lumberjack server component not thread safe (#4540)
---
 .../lumberjack/io/LumberjackSessionHandler.java    | 16 ++++++++-
 .../LumberjackComponentGlobalSSLTest.java          |  8 +++--
 .../lumberjack/LumberjackComponentSSLTest.java     |  7 ++--
 .../lumberjack/LumberjackComponentTest.java        |  8 +++--
 .../lumberjack/LumberjackDisconnectionTest.java    |  5 ++-
 ...entTest.java => LumberjackMultiThreadTest.java} | 38 +++++++++++++++++++---
 .../camel/component/lumberjack/LumberjackUtil.java | 10 +++---
 .../io/LumberjackChannelInitializerTest.java       |  9 +++++
 8 files changed, 79 insertions(+), 22 deletions(-)

diff --git a/components/camel-lumberjack/src/main/java/org/apache/camel/component/lumberjack/io/LumberjackSessionHandler.java b/components/camel-lumberjack/src/main/java/org/apache/camel/component/lumberjack/io/LumberjackSessionHandler.java
index bf9b0b1..7e49d7e 100644
--- a/components/camel-lumberjack/src/main/java/org/apache/camel/component/lumberjack/io/LumberjackSessionHandler.java
+++ b/components/camel-lumberjack/src/main/java/org/apache/camel/component/lumberjack/io/LumberjackSessionHandler.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.lumberjack.io;
 
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import org.slf4j.Logger;
@@ -36,6 +39,8 @@ final class LumberjackSessionHandler {
     private volatile int version = -1;
     private volatile int windowSize = 1;
     private volatile int nextAck = ACK_UNSET;
+    // this phaser will handle one window processed at a time for each session
+    private final Phaser phaser = new Phaser();
 
     void versionRead(int version) {
         if (this.version == -1) {
@@ -51,6 +56,10 @@ final class LumberjackSessionHandler {
 
     void windowSizeRead(int windowSize) {
         LOG.debug("Lumberjack window size is {}", windowSize);
+        // register a new window process
+        phaser.register();
+        // if another window is being processed in the same session, wait until it ends processing
+        phaser.arriveAndAwaitAdvance();
         this.windowSize = windowSize;
         nextAck = ACK_UNSET;
     }
@@ -66,7 +75,12 @@ final class LumberjackSessionHandler {
             response.writeByte(version);
             response.writeByte(TYPE_ACKNOWLEDGE);
             response.writeInt(sequenceNumber);
-            ctx.writeAndFlush(response);
+            // pause before send, in order to make sure all ACK are received by the client
+            ctx.executor().schedule(() -> {
+                ctx.writeAndFlush(response);
+            }, 10, TimeUnit.MILLISECONDS);
+            // prepare to read another pending window
+            phaser.arriveAndDeregister();
         }
     }
 }
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentGlobalSSLTest.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentGlobalSSLTest.java
index 3ae3b30..32b9ab5 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentGlobalSSLTest.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentGlobalSSLTest.java
@@ -67,17 +67,19 @@ public class LumberjackComponentGlobalSSLTest extends CamelTestSupport {
 
         // We're expecting 25 messages with Maps
         MockEndpoint mock = getMockEndpoint("mock:output");
-        mock.expectedMessageCount(25);
+        mock.expectedMessageCount(60);
         mock.allMessages().body().isInstanceOf(Map.class);
 
+        List<Integer> windows = Arrays.asList(15, 10, 15, 10, 10);
+
         // When sending messages
-        List<Integer> responses = LumberjackUtil.sendMessages(port, createClientSSLContextParameters());
+        List<Integer> responses = LumberjackUtil.sendMessages(port, createClientSSLContextParameters(), windows);
 
         // Then we should have the messages we're expecting
         mock.assertIsSatisfied();
 
         // And we should have replied with 2 acknowledgments for each window frame
-        assertEquals(Arrays.asList(10, 15), responses);
+        assertEquals(windows, responses);
     }
 
     /**
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentSSLTest.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentSSLTest.java
index 8251269..b4e7b4c 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentSSLTest.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentSSLTest.java
@@ -58,17 +58,18 @@ public class LumberjackComponentSSLTest extends CamelTestSupport {
     public void shouldListenToMessagesOverSSL() throws Exception {
         // We're expecting 25 messages with Maps
         MockEndpoint mock = getMockEndpoint("mock:output");
-        mock.expectedMessageCount(25);
+        mock.expectedMessageCount(60);
         mock.allMessages().body().isInstanceOf(Map.class);
+        List<Integer> windows = Arrays.asList(15, 10, 15, 10, 10);
 
         // When sending messages
-        List<Integer> responses = LumberjackUtil.sendMessages(port, createClientSSLContextParameters());
+        List<Integer> responses = LumberjackUtil.sendMessages(port, createClientSSLContextParameters(), windows);
 
         // Then we should have the messages we're expecting
         mock.assertIsSatisfied();
 
         // And we should have replied with 2 acknowledgments for each window frame
-        assertEquals(Arrays.asList(10, 15), responses);
+        assertEquals(windows, responses);
     }
 
     /**
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java
index 728746c..7c07fcf 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java
@@ -51,11 +51,13 @@ public class LumberjackComponentTest extends CamelTestSupport {
     public void shouldListenToMessages() throws Exception {
         // We're expecting 25 messages with Maps
         MockEndpoint mock = getMockEndpoint("mock:output");
-        mock.expectedMessageCount(25);
+        mock.expectedMessageCount(60);
         mock.allMessages().body().isInstanceOf(Map.class);
 
+        List<Integer> windows = Arrays.asList(15, 10, 15, 10, 10);
+
         // When sending messages
-        List<Integer> responses = LumberjackUtil.sendMessages(port, null);
+        List<Integer> responses = LumberjackUtil.sendMessages(port, null, windows);
 
         // Then we should have the messages we're expecting
         mock.assertIsSatisfied();
@@ -67,6 +69,6 @@ public class LumberjackComponentTest extends CamelTestSupport {
                 first.get("source"));
 
         // And we should have replied with 2 acknowledgments for each window frame
-        assertEquals(Arrays.asList(10, 15), responses);
+        assertEquals(windows, responses);
     }
 }
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackDisconnectionTest.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackDisconnectionTest.java
index 7582a0b..d070809 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackDisconnectionTest.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackDisconnectionTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.lumberjack;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -56,8 +57,10 @@ public class LumberjackDisconnectionTest extends CamelTestSupport {
         mock.expectedMessageCount(3);
         mock.allMessages().body().isInstanceOf(Map.class);
 
+        List<Integer> windows = Arrays.asList(15, 10);
+
         // When sending messages
-        List<Integer> responses = LumberjackUtil.sendMessages(port, null);
+        List<Integer> responses = LumberjackUtil.sendMessages(port, null, windows);
 
         // Then we should have the messages we're expecting
         mock.assertIsSatisfied();
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackMultiThreadTest.java
similarity index 70%
copy from components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java
copy to components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackMultiThreadTest.java
index 728746c..e2305f6 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackMultiThreadTest.java
@@ -16,9 +16,11 @@
  */
 package org.apache.camel.component.lumberjack;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -29,7 +31,8 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class LumberjackComponentTest extends CamelTestSupport {
+public class LumberjackMultiThreadTest extends CamelTestSupport {
+
     private static int port;
 
     @BeforeAll
@@ -51,11 +54,20 @@ public class LumberjackComponentTest extends CamelTestSupport {
     public void shouldListenToMessages() throws Exception {
         // We're expecting 25 messages with Maps
         MockEndpoint mock = getMockEndpoint("mock:output");
-        mock.expectedMessageCount(25);
+        mock.expectedMessageCount(125);
         mock.allMessages().body().isInstanceOf(Map.class);
 
         // When sending messages
-        List<Integer> responses = LumberjackUtil.sendMessages(port, null);
+        List<Integer> windows = Arrays.asList(15, 10);
+
+        // create 5 threads
+        List<LumberjackThreadTest> threads = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            threads.add(new LumberjackThreadTest());
+        }
+
+        // sending messages on 5 parallel sessions
+        threads.stream().forEach(thread -> thread.start());
 
         // Then we should have the messages we're expecting
         mock.assertIsSatisfied();
@@ -66,7 +78,23 @@ public class LumberjackComponentTest extends CamelTestSupport {
         assertEquals("/home/qatest/collectNetwork/log/data-integration/00000000-f000-0000-1541-8da26f200001/absorption.log",
                 first.get("source"));
 
-        // And we should have replied with 2 acknowledgments for each window frame
-        assertEquals(Arrays.asList(10, 15), responses);
+        TimeUnit.MILLISECONDS.sleep(2000);
+
+        // And we should have replied with 2 acknowledgments for each session frame
+        threads.stream().forEach(thread -> assertEquals(windows, thread.responses));
+    }
+
+    class LumberjackThreadTest extends Thread {
+        private List<Integer> responses;
+
+        @Override
+        public void run() {
+            try {
+                this.responses = LumberjackUtil.sendMessages(port, null, Arrays.asList(15, 10));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
     }
 }
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackUtil.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackUtil.java
index b6e7a6d..721c57f 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackUtil.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackUtil.java
@@ -44,7 +44,8 @@ final class LumberjackUtil {
     private LumberjackUtil() {
     }
 
-    static List<Integer> sendMessages(int port, SSLContextParameters sslContextParameters) throws InterruptedException {
+    static List<Integer> sendMessages(int port, SSLContextParameters sslContextParameters, List<Integer> windows)
+            throws InterruptedException {
         NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
         try {
             // This list will hold the acknowledgment response sequence numbers
@@ -82,11 +83,8 @@ final class LumberjackUtil {
                     .handler(initializer)                         //
                     .connect("127.0.0.1", port).sync().channel(); //
 
-            // Send the 2 window frames
-            TimeUnit.MILLISECONDS.sleep(500);
-            channel.writeAndFlush(readSample("io/window10"));
-            TimeUnit.MILLISECONDS.sleep(500);
-            channel.writeAndFlush(readSample("io/window15"));
+            // send 5 frame windows, without pausing
+            windows.stream().forEach(window -> channel.writeAndFlush(readSample(String.format("io/window%s", window))));
             TimeUnit.MILLISECONDS.sleep(500);
 
             channel.close();
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/io/LumberjackChannelInitializerTest.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/io/LumberjackChannelInitializerTest.java
index 859190c..9f2532c 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/io/LumberjackChannelInitializerTest.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/io/LumberjackChannelInitializerTest.java
@@ -21,6 +21,7 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.embedded.EmbeddedChannel;
@@ -46,6 +47,14 @@ public class LumberjackChannelInitializerTest {
         writeResourceBytePerByte(channel, "window10");
         writeResourceBytePerByte(channel, "window15");
 
+        // EmbeddedChannel is no "real" Channel implementation and mainly use-able for testing and embedded ChannelHandlers
+        // since now we are executing scheduled writeAndFlush for parallel messages within a single session
+        // we need to use runPendingTasks for this type of Channel
+        // this is use case for internal camel code test only : other unit tests use production like channels and don't need
+        // adding runPendingTasks()
+        TimeUnit.MILLISECONDS.sleep(2000);
+        channel.runPendingTasks();
+
         // Then we must have 25 messages with only maps
         assertEquals(25, messages.size());