You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/30 11:31:49 UTC
[camel] branch master updated: CAMEL-15718: Camel lumberjack server
component not thread safe (#4540)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 68c454b CAMEL-15718: Camel lumberjack server component not thread safe (#4540)
68c454b is described below
commit 68c454b023ea0bf5f342ec0cca8b57ba71490924
Author: Zineb BENDHIBA <be...@gmail.com>
AuthorDate: Fri Oct 30 12:31:23 2020 +0100
CAMEL-15718: Camel lumberjack server component not thread safe (#4540)
---
.../lumberjack/io/LumberjackSessionHandler.java | 16 ++++++++-
.../LumberjackComponentGlobalSSLTest.java | 8 +++--
.../lumberjack/LumberjackComponentSSLTest.java | 7 ++--
.../lumberjack/LumberjackComponentTest.java | 8 +++--
.../lumberjack/LumberjackDisconnectionTest.java | 5 ++-
...entTest.java => LumberjackMultiThreadTest.java} | 38 +++++++++++++++++++---
.../camel/component/lumberjack/LumberjackUtil.java | 10 +++---
.../io/LumberjackChannelInitializerTest.java | 9 +++++
8 files changed, 79 insertions(+), 22 deletions(-)
diff --git a/components/camel-lumberjack/src/main/java/org/apache/camel/component/lumberjack/io/LumberjackSessionHandler.java b/components/camel-lumberjack/src/main/java/org/apache/camel/component/lumberjack/io/LumberjackSessionHandler.java
index bf9b0b1..7e49d7e 100644
--- a/components/camel-lumberjack/src/main/java/org/apache/camel/component/lumberjack/io/LumberjackSessionHandler.java
+++ b/components/camel-lumberjack/src/main/java/org/apache/camel/component/lumberjack/io/LumberjackSessionHandler.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.lumberjack.io;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
@@ -36,6 +39,8 @@ final class LumberjackSessionHandler {
private volatile int version = -1;
private volatile int windowSize = 1;
private volatile int nextAck = ACK_UNSET;
+ // this phaser will handle one window processed at a time for each session
+ private final Phaser phaser = new Phaser();
void versionRead(int version) {
if (this.version == -1) {
@@ -51,6 +56,10 @@ final class LumberjackSessionHandler {
void windowSizeRead(int windowSize) {
LOG.debug("Lumberjack window size is {}", windowSize);
+ // register a new window process
+ phaser.register();
+ // if another window is being processed in the same session, wait until it ends processing
+ phaser.arriveAndAwaitAdvance();
this.windowSize = windowSize;
nextAck = ACK_UNSET;
}
@@ -66,7 +75,12 @@ final class LumberjackSessionHandler {
response.writeByte(version);
response.writeByte(TYPE_ACKNOWLEDGE);
response.writeInt(sequenceNumber);
- ctx.writeAndFlush(response);
+ // pause before send, in order to make sure all ACK are received by the client
+ ctx.executor().schedule(() -> {
+ ctx.writeAndFlush(response);
+ }, 10, TimeUnit.MILLISECONDS);
+ // prepare to read another pending window
+ phaser.arriveAndDeregister();
}
}
}
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentGlobalSSLTest.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentGlobalSSLTest.java
index 3ae3b30..32b9ab5 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentGlobalSSLTest.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentGlobalSSLTest.java
@@ -67,17 +67,19 @@ public class LumberjackComponentGlobalSSLTest extends CamelTestSupport {
// We're expecting 25 messages with Maps
MockEndpoint mock = getMockEndpoint("mock:output");
- mock.expectedMessageCount(25);
+ mock.expectedMessageCount(60);
mock.allMessages().body().isInstanceOf(Map.class);
+ List<Integer> windows = Arrays.asList(15, 10, 15, 10, 10);
+
// When sending messages
- List<Integer> responses = LumberjackUtil.sendMessages(port, createClientSSLContextParameters());
+ List<Integer> responses = LumberjackUtil.sendMessages(port, createClientSSLContextParameters(), windows);
// Then we should have the messages we're expecting
mock.assertIsSatisfied();
// And we should have replied with 2 acknowledgments for each window frame
- assertEquals(Arrays.asList(10, 15), responses);
+ assertEquals(windows, responses);
}
/**
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentSSLTest.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentSSLTest.java
index 8251269..b4e7b4c 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentSSLTest.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentSSLTest.java
@@ -58,17 +58,18 @@ public class LumberjackComponentSSLTest extends CamelTestSupport {
public void shouldListenToMessagesOverSSL() throws Exception {
// We're expecting 25 messages with Maps
MockEndpoint mock = getMockEndpoint("mock:output");
- mock.expectedMessageCount(25);
+ mock.expectedMessageCount(60);
mock.allMessages().body().isInstanceOf(Map.class);
+ List<Integer> windows = Arrays.asList(15, 10, 15, 10, 10);
// When sending messages
- List<Integer> responses = LumberjackUtil.sendMessages(port, createClientSSLContextParameters());
+ List<Integer> responses = LumberjackUtil.sendMessages(port, createClientSSLContextParameters(), windows);
// Then we should have the messages we're expecting
mock.assertIsSatisfied();
// And we should have replied with 2 acknowledgments for each window frame
- assertEquals(Arrays.asList(10, 15), responses);
+ assertEquals(windows, responses);
}
/**
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java
index 728746c..7c07fcf 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java
@@ -51,11 +51,13 @@ public class LumberjackComponentTest extends CamelTestSupport {
public void shouldListenToMessages() throws Exception {
// We're expecting 25 messages with Maps
MockEndpoint mock = getMockEndpoint("mock:output");
- mock.expectedMessageCount(25);
+ mock.expectedMessageCount(60);
mock.allMessages().body().isInstanceOf(Map.class);
+ List<Integer> windows = Arrays.asList(15, 10, 15, 10, 10);
+
// When sending messages
- List<Integer> responses = LumberjackUtil.sendMessages(port, null);
+ List<Integer> responses = LumberjackUtil.sendMessages(port, null, windows);
// Then we should have the messages we're expecting
mock.assertIsSatisfied();
@@ -67,6 +69,6 @@ public class LumberjackComponentTest extends CamelTestSupport {
first.get("source"));
// And we should have replied with 2 acknowledgments for each window frame
- assertEquals(Arrays.asList(10, 15), responses);
+ assertEquals(windows, responses);
}
}
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackDisconnectionTest.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackDisconnectionTest.java
index 7582a0b..d070809 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackDisconnectionTest.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackDisconnectionTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.lumberjack;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -56,8 +57,10 @@ public class LumberjackDisconnectionTest extends CamelTestSupport {
mock.expectedMessageCount(3);
mock.allMessages().body().isInstanceOf(Map.class);
+ List<Integer> windows = Arrays.asList(15, 10);
+
// When sending messages
- List<Integer> responses = LumberjackUtil.sendMessages(port, null);
+ List<Integer> responses = LumberjackUtil.sendMessages(port, null, windows);
// Then we should have the messages we're expecting
mock.assertIsSatisfied();
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackMultiThreadTest.java
similarity index 70%
copy from components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java
copy to components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackMultiThreadTest.java
index 728746c..e2305f6 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackComponentTest.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackMultiThreadTest.java
@@ -16,9 +16,11 @@
*/
package org.apache.camel.component.lumberjack;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -29,7 +31,8 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class LumberjackComponentTest extends CamelTestSupport {
+public class LumberjackMultiThreadTest extends CamelTestSupport {
+
private static int port;
@BeforeAll
@@ -51,11 +54,20 @@ public class LumberjackComponentTest extends CamelTestSupport {
public void shouldListenToMessages() throws Exception {
// We're expecting 25 messages with Maps
MockEndpoint mock = getMockEndpoint("mock:output");
- mock.expectedMessageCount(25);
+ mock.expectedMessageCount(125);
mock.allMessages().body().isInstanceOf(Map.class);
// When sending messages
- List<Integer> responses = LumberjackUtil.sendMessages(port, null);
+ List<Integer> windows = Arrays.asList(15, 10);
+
+ // create 5 threads
+ List<LumberjackThreadTest> threads = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ threads.add(new LumberjackThreadTest());
+ }
+
+ // sending messages on 5 parallel sessions
+ threads.stream().forEach(thread -> thread.start());
// Then we should have the messages we're expecting
mock.assertIsSatisfied();
@@ -66,7 +78,23 @@ public class LumberjackComponentTest extends CamelTestSupport {
assertEquals("/home/qatest/collectNetwork/log/data-integration/00000000-f000-0000-1541-8da26f200001/absorption.log",
first.get("source"));
- // And we should have replied with 2 acknowledgments for each window frame
- assertEquals(Arrays.asList(10, 15), responses);
+ TimeUnit.MILLISECONDS.sleep(2000);
+
+ // And we should have replied with 2 acknowledgments for each session frame
+ threads.stream().forEach(thread -> assertEquals(windows, thread.responses));
+ }
+
+ class LumberjackThreadTest extends Thread {
+ private List<Integer> responses;
+
+ @Override
+ public void run() {
+ try {
+ this.responses = LumberjackUtil.sendMessages(port, null, Arrays.asList(15, 10));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
}
}
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackUtil.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackUtil.java
index b6e7a6d..721c57f 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackUtil.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/LumberjackUtil.java
@@ -44,7 +44,8 @@ final class LumberjackUtil {
private LumberjackUtil() {
}
- static List<Integer> sendMessages(int port, SSLContextParameters sslContextParameters) throws InterruptedException {
+ static List<Integer> sendMessages(int port, SSLContextParameters sslContextParameters, List<Integer> windows)
+ throws InterruptedException {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// This list will hold the acknowledgment response sequence numbers
@@ -82,11 +83,8 @@ final class LumberjackUtil {
.handler(initializer) //
.connect("127.0.0.1", port).sync().channel(); //
- // Send the 2 window frames
- TimeUnit.MILLISECONDS.sleep(500);
- channel.writeAndFlush(readSample("io/window10"));
- TimeUnit.MILLISECONDS.sleep(500);
- channel.writeAndFlush(readSample("io/window15"));
+ // send 5 frame windows, without pausing
+ windows.stream().forEach(window -> channel.writeAndFlush(readSample(String.format("io/window%s", window))));
TimeUnit.MILLISECONDS.sleep(500);
channel.close();
diff --git a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/io/LumberjackChannelInitializerTest.java b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/io/LumberjackChannelInitializerTest.java
index 859190c..9f2532c 100644
--- a/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/io/LumberjackChannelInitializerTest.java
+++ b/components/camel-lumberjack/src/test/java/org/apache/camel/component/lumberjack/io/LumberjackChannelInitializerTest.java
@@ -21,6 +21,7 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
@@ -46,6 +47,14 @@ public class LumberjackChannelInitializerTest {
writeResourceBytePerByte(channel, "window10");
writeResourceBytePerByte(channel, "window15");
+ // EmbeddedChannel is no "real" Channel implementation and mainly use-able for testing and embedded ChannelHandlers
+ // since now we are executing scheduled writeAndFlush for parallel messages within a single session
+ // we need to use runPendingTasks for this type of Channel
+ // this is use case for internal camel code test only : other unit tests use production like channels and don't need
+ // adding runPendingTasks()
+ TimeUnit.MILLISECONDS.sleep(2000);
+ channel.runPendingTasks();
+
// Then we must have 25 messages with only maps
assertEquals(25, messages.size());