You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2022/11/17 03:48:32 UTC
[samza] branch master updated: LISAMZA-27395 removing the current recursive call prevention logic (#1641)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new d5d3d989b LISAMZA-27395 removing the current recursive call prevention logic (#1641)
d5d3d989b is described below
commit d5d3d989b0b97b55e573b1c895156238b4599d29
Author: jia-gao <94...@users.noreply.github.com>
AuthorDate: Wed Nov 16 19:48:25 2022 -0800
LISAMZA-27395 removing the current recursive call prevention logic (#1641)
* LISAMZA-27395 removing the current recursive call prevention logic since it doesn’t work as expected
* Add unit tests for streamappender change
---
.../samza/logging/log4j2/StreamAppender.java | 52 ++++++----------
.../logging/log4j2/StreamAppenderMetrics.java | 4 --
.../samza/logging/log4j2/TestStreamAppender.java | 69 ++++++++++++++++++++++
3 files changed, 88 insertions(+), 37 deletions(-)
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index c369282b8..42c62ca5b 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
@@ -85,12 +84,6 @@ public class StreamAppender extends AbstractAppender {
private String streamName = null;
private final boolean usingAsyncLogger;
private final LoggingContextHolder loggingContextHolder;
-
- /**
- * used to detect if this thread is called recursively
- */
- private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
-
protected static final int DEFAULT_QUEUE_SIZE = 100;
protected volatile boolean systemInitialized = false;
protected StreamAppenderMetrics metrics;
@@ -189,37 +182,30 @@ public class StreamAppender extends AbstractAppender {
@Override
public void append(LogEvent event) {
- if (!recursiveCall.get()) {
- try {
- recursiveCall.set(true);
- if (!systemInitialized) {
- // configs are needed to set up producer system, so check that before actually initializing
- if (this.loggingContextHolder.getConfig() != null) {
- synchronized (this) {
- if (!systemInitialized) {
- setupSystem();
- systemInitialized = true;
- }
+ try {
+ if (!systemInitialized) {
+ // configs are needed to set up producer system, so check that before actually initializing
+ if (this.loggingContextHolder.getConfig() != null) {
+ synchronized (this) {
+ if (!systemInitialized) {
+ setupSystem();
+ systemInitialized = true;
}
- handleEvent(event);
- } else {
- // skip sending the log to the stream if initialization can't happen yet
- System.out.println("Waiting for config to become available before log can be handled");
}
- } else {
handleEvent(event);
+ } else {
+ // skip sending the log to the stream if initialization can't happen yet
+ System.out.println("Waiting for config to become available before log can be handled");
}
- } catch (Exception e) {
- if (metrics != null) { // setupSystem() may not have been invoked yet so metrics can be null here.
- metrics.logMessagesErrors.inc();
- }
- System.err.println(String.format("[%s] Error sending log message:", getName()));
- e.printStackTrace();
- } finally {
- recursiveCall.set(false);
+ } else {
+ handleEvent(event);
+ }
+ } catch (Exception e) {
+ if (metrics != null) { // setupSystem() may not have been invoked yet so metrics can be null here.
+ metrics.logMessagesErrors.inc();
}
- } else if (metrics != null) { // setupSystem() may not have been invoked yet so metrics can be null here.
- metrics.recursiveCalls.inc();
+ System.err.println(String.format("[%s] Error sending log message:", getName()));
+ e.printStackTrace();
}
}
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
index 466a520bd..e8e784b08 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
@@ -28,9 +28,6 @@ public class StreamAppenderMetrics extends MetricsBase {
/** The percentage of the log queue capacity that is currently filled with messages from 0 to 100. */
public final Gauge<Integer> bufferFillPct;
- /** The number of recursive calls to the StreamAppender. These events will not be logged. */
- public final Counter recursiveCalls;
-
/** The number of log messages dropped e.g. because of buffer overflow. Does not include recursive calls. */
public final Counter logMessagesDropped;
@@ -46,7 +43,6 @@ public class StreamAppenderMetrics extends MetricsBase {
public StreamAppenderMetrics(String prefix, MetricsRegistry registry) {
super(prefix + "-", registry);
bufferFillPct = newGauge("buffer-fill-percent", 0);
- recursiveCalls = newCounter("recursive-calls");
logMessagesDropped = newCounter("log-messages-dropped");
logMessagesErrors = newCounter("log-messages-errors");
logMessagesBytesSent = newCounter("log-messages-bytes-sent");
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
index 74437babf..495022f5e 100644
--- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.ImmutableMap;
@@ -375,6 +377,73 @@ public class TestStreamAppender {
}
}
+ @Test
+ public void testLogConcurrently() throws InterruptedException {
+ System.setProperty("samza.container.name", "samza-container-1");
+
+ PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
+ StreamAppender streamAppender =
+ new StreamAppender("testName", null, layout, false, true, null, this.loggingContextHolder);
+ startAndAttachAppender(streamAppender);
+ List<String> messages = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ messages.add("testing" + i);
+ }
+ logConcurrentlyAndVerifyMessages(messages);
+ streamAppender.stop();
+ }
+
+ @Test
+ public void testLogRecursively() throws InterruptedException {
+ System.setProperty("samza.container.name", "samza-container-1");
+
+ PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
+ StreamAppender streamAppender =
+ new StreamAppender("testName", null, layout, false, true, null, this.loggingContextHolder);
+ startAndAttachAppender(streamAppender);
+ List<String> messages = Lists.newArrayList("testing1", "testing2");
+ logRecursivelyAndVerifyMessages(messages);
+ streamAppender.stop();
+ }
+
+ private void logConcurrentlyAndVerifyMessages(List<String> messages) throws InterruptedException {
+ // Set up latch
+ final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+ MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown());
+ ExecutorService service = Executors.newFixedThreadPool(10);
+
+ // Log the messages concurrently
+ for (String message : messages) {
+ service.submit(() -> {
+ LOG.info(message);
+ });
+ }
+ // Wait for messages
+ assertTrue("Timeout while waiting for StreamAppender to send all messages. Count: " + allMessagesSent.getCount(),
+ allMessagesSent.await(60, TimeUnit.SECONDS));
+
+ // MockSystemProducer.messagesReceived is not thread safe, verify allMessagesSent CountDownLatch instead
+ assertEquals(0, allMessagesSent.getCount());
+ service.shutdown();
+ }
+
+ private void logRecursivelyAndVerifyMessages(List<String> messages) throws InterruptedException {
+ // Set up latch
+ final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+ MockSystemProducer.listeners.add((source, envelope) -> {
+ LOG.info("system producer invoked");
+ allMessagesSent.countDown();
+ });
+ // Log the messages
+ messages.forEach(LOG::info);
+ // Wait for messages
+ assertTrue("Timeout while waiting for StreamAppender to send all messages. Count: " + allMessagesSent.getCount(),
+ allMessagesSent.await(60, TimeUnit.SECONDS));
+
+ // Verify
+ assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
+ }
+
private static Config baseConfig() {
Map<String, String> map = new HashMap<>();
map.put("job.name", "log4jTest");