You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/08/10 23:12:58 UTC

[GitHub] [samza] rmatharu opened a new pull request #1411: SAMZA-2577 : Adding support for Async-Logger in Log4j2 Stream Appender

rmatharu opened a new pull request #1411:
URL: https://github.com/apache/samza/pull/1411


   Problem: 
   In both StreamAppender for log4j1 and log4j2 a blocking queue is used to coordinate between the append()-ing threads and a single thread send()-ing to Kafka.
   This is a bounded, blocking, lock-synchronized queue.
   To avoid deadlock scenarios (see SAMZA-1537), the append()-ing threads have a timeout of 2 seconds, after which the log message is discarded and the queue is drained. 
   This means in case of message bursts, threads calling append() may block for upto 2 seconds, and may continually be stuck in this pattern, leading to processing stalls and lowered throughput. 
   
   Solutions for Log4j2 
   Solution 1. Enable async logger in log4j2, since they are supported and provided in log4j2.https://logging.apache.org/log4j/2.x/manual/async.html.
   In using this capability, the blocking-queue in StreamAppender is not required because the logger itself will be asynchronous, and so append() threads can directly call systemProducer.send(). 
   However, if async loggers are not used then this queue based mechanism, to give the append()-ing threads an "async" illusion, is required.
   
   Solution 2. Continue using the blocking bounded lock-based queue, but make the queue size and timeout configurable. Users can then tune this to account for message bursts.
   
   Solution 3. Move to use a lock-less queue, e.g., ConcurrentLinkedQueue (unbounded) or 
   implement a bounded lock-less queue, or use [open-source implementations|https://stackoverflow.com/questions/20890554/lock-free-circular-array].
   Append()-ing threads will no longer need to block or timeout. However the caller may busy-wait or need a fixed-rate or fixed-sleep-time to avoid busy waits, since a lock-less queue is non blocking. 
   It uses CAS operations. 
   For log4j2, we will adopt Solution 1.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1411: SAMZA-2577 : Adding support for Async-Logger in Log4j2 Stream Appender

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1411:
URL: https://github.com/apache/samza/pull/1411#discussion_r468862157



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -397,15 +425,7 @@ private void startTransferThread() {
       Runnable transferFromQueueToSystem = () -> {
         while (!Thread.currentThread().isInterrupted()) {
           try {
-            byte[] serializedLogEvent = logQueue.take();
-
-            metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
-            metrics.logMessagesCountSent.inc();
-
-            OutgoingMessageEnvelope outgoingMessageEnvelope =
-                new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent);
-            systemProducer.send(SOURCE, outgoingMessageEnvelope);
-
+            sendEventToSystemProducer(logQueue.take());

Review comment:
       Now that you have cached keyBytes at the instance level, can we get rid of the local keyBytes byte array?

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -239,6 +227,46 @@ public void append(LogEvent event) {
     }
   }
 
+  /**
+   * If async-Logger is enabled, the log-event is sent directly to the systemProducer. Else, the event is serialized
+   * and added to a bounded blocking queue, before returning to the "synchronous" caller.
+   * @param event the log event to append
+   * @throws InterruptedException
+   */
+  private void handleEvent(LogEvent event) throws InterruptedException {
+    if (usingAsyncLogger) {
+      sendEventToSystemProducer(encodeLogEventToBytes(event));
+      return;
+    }
+
+    // Serialize the event before adding to the queue to leverage the caller thread
+    // and ensure that the transferThread can keep up.
+    if (!logQueue.offer(encodeLogEventToBytes(event), queueTimeoutS, TimeUnit.SECONDS)) {
+      // Do NOT retry adding system to the queue. Dropping the event allows us to alleviate the unlikely
+      // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
+      // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
+      // of those two code paths can cause a deadlock. Dropping the event allows us to proceed.
+
+      // Scenario:
+      // T1: holds L1 and is waiting for L2
+      // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1
+
+      // This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control,
+      // so dropping events in the StreamAppender is our best recourse.
+
+      // Drain the queue instead of dropping one message just to reduce the frequency of warn logs above.
+      int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event
+      log.warn(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.",
+          queueTimeoutS,
+          systemStream.toString(),
+          messagesDropped));

Review comment:
       Capturing our offline conversation - "We will need this to replaced with System.err as the recursion will drop this message in the event of not able to acquire the lock within the timeout"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat merged pull request #1411: SAMZA-2577 : Adding support for Async-Logger in Log4j2 Stream Appender

Posted by GitBox <gi...@apache.org>.
mynameborat merged pull request #1411:
URL: https://github.com/apache/samza/pull/1411


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org