You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/08/11 21:55:50 UTC

[GitHub] [samza] mynameborat commented on a change in pull request #1411: SAMZA-2577 : Adding support for Async-Logger in Log4j2 Stream Appender

mynameborat commented on a change in pull request #1411:
URL: https://github.com/apache/samza/pull/1411#discussion_r468862157



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -397,15 +425,7 @@ private void startTransferThread() {
       Runnable transferFromQueueToSystem = () -> {
         while (!Thread.currentThread().isInterrupted()) {
           try {
-            byte[] serializedLogEvent = logQueue.take();
-
-            metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
-            metrics.logMessagesCountSent.inc();
-
-            OutgoingMessageEnvelope outgoingMessageEnvelope =
-                new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent);
-            systemProducer.send(SOURCE, outgoingMessageEnvelope);
-
+            sendEventToSystemProducer(logQueue.take());

Review comment:
       Now that you have cached keyBytes at the instance level, can we get rid of the local keyBytes byte array?

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -239,6 +227,46 @@ public void append(LogEvent event) {
     }
   }
 
+  /**
+   * If async-Logger is enabled, the log-event is sent directly to the systemProducer. Else, the event is serialized
+   * and added to a bounded blocking queue, before returning to the "synchronous" caller.
+   * @param event the log event to append
+   * @throws InterruptedException
+   */
+  private void handleEvent(LogEvent event) throws InterruptedException {
+    if (usingAsyncLogger) {
+      sendEventToSystemProducer(encodeLogEventToBytes(event));
+      return;
+    }
+
+    // Serialize the event before adding to the queue to leverage the caller thread
+    // and ensure that the transferThread can keep up.
+    if (!logQueue.offer(encodeLogEventToBytes(event), queueTimeoutS, TimeUnit.SECONDS)) {
+      // Do NOT retry adding system to the queue. Dropping the event allows us to alleviate the unlikely
+      // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
+      // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
+      // of those two code paths can cause a deadlock. Dropping the event allows us to proceed.
+
+      // Scenario:
+      // T1: holds L1 and is waiting for L2
+      // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1
+
+      // This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control,
+      // so dropping events in the StreamAppender is our best recourse.
+
+      // Drain the queue instead of dropping one message just to reduce the frequency of warn logs above.
+      int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event
+      log.warn(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.",
+          queueTimeoutS,
+          systemStream.toString(),
+          messagesDropped));

Review comment:
       Capturing our offline conversation - "We will need this to replaced with System.err as the recursion will drop this message in the event of not able to acquire the lock within the timeout"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org