You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/10/02 22:30:21 UTC

[beam] branch master updated: [BEAM-9399] Change DataflowWorkerLoggingHandler to report errors to the original System.err

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f34ecc3  [BEAM-9399] Change DataflowWorkerLoggingHandler to report errors to the original System.err
     new 0113e4b  Merge pull request #12825 from scwhittle/logging
f34ecc3 is described below

commit f34ecc3c7d8a197e3539f2ee6dcc22d8792be6fc
Author: Sam Whittle <sa...@google.com>
AuthorDate: Fri Sep 11 04:20:49 2020 -0700

    [BEAM-9399] Change DataflowWorkerLoggingHandler to report errors to the original System.err
    
    Currently such errors are logged to System.err which is a PrintStream that
    publishes to the handler.  This is perhaps unlikely to work if earlier publishing
    failed and additionally removes a potential deadlock between the PrintStream
    object sychronization and the Handler object synchronization. This was attempted
    to be fixed earlier by dissallowing the PrintStream object to be synchronized
    when calling into the handler.  However this is possible to be triggered by
    external synchronization on the PrintStream, such as that performed by
    Throwable.printStackTrace. Changing the PrintStream to use separate synchronization
    for buffering works in most cases but not for cases where the stream is externally
    synchronized.
---
 .../logging/DataflowWorkerLoggingInitializer.java  | 33 +++++++++++++++++++
 .../JulHandlerPrintStreamAdapterFactory.java       | 38 +++++++++-------------
 .../JulHandlerPrintStreamAdapterFactoryTest.java   |  8 +++++
 3 files changed, 57 insertions(+), 22 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
index 5668847..ebef8ea 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.util.List;
 import java.util.Map;
+import java.util.logging.ErrorManager;
 import java.util.logging.Level;
 import java.util.logging.LogManager;
 import java.util.logging.Logger;
@@ -98,6 +99,35 @@ public class DataflowWorkerLoggingInitializer {
   private static PrintStream originalStdErr = System.err;
   private static boolean initialized = false;
 
+  // This is the same as ErrorManager except that it uses the provided
+  // print stream.
+  public static class PrintStreamErrorManager extends ErrorManager {
+    public PrintStreamErrorManager(PrintStream stream) {
+      this.stream = stream;
+    }
+
+    private PrintStream stream;
+    private boolean reported = false;
+
+    @Override
+    public synchronized void error(String msg, Exception ex, int code) {
+      if (reported) {
+        // We only report the first error, to avoid clogging
+        // the screen.
+        return;
+      }
+      reported = true;
+      String text = "java.util.logging.ErrorManager: " + code;
+      if (msg != null) {
+        text = text + ": " + msg;
+      }
+      stream.println(text);
+      if (ex != null) {
+        ex.printStackTrace(stream);
+      }
+    }
+  };
+
   private static DataflowWorkerLoggingHandler makeLoggingHandler(
       String filepathProperty, String defaultFilePath) throws IOException {
     String filepath = System.getProperty(filepathProperty, defaultFilePath);
@@ -105,6 +135,9 @@ public class DataflowWorkerLoggingInitializer {
     DataflowWorkerLoggingHandler handler =
         new DataflowWorkerLoggingHandler(filepath, filesizeMb * 1024L * 1024L);
     handler.setLevel(Level.ALL);
+    // To avoid potential deadlock between the handler and the System.err print stream, use the
+    // original stderr print stream for errors. See BEAM-9399.
+    handler.setErrorManager(new PrintStreamErrorManager(getOriginalStdErr()));
     return handler;
   }
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
index 08a995d..79ef587 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.dataflow.worker.logging;
 
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -96,10 +94,10 @@ class JulHandlerPrintStreamAdapterFactory {
 
     @Override
     public void flush() {
-      publishIfNonEmpty(flushToString());
+      publishIfNonEmpty(flushBufferToString());
     }
 
-    private synchronized String flushToString() {
+    private synchronized String flushBufferToString() {
       if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
         buffer.setLength(buffer.length() - 1);
       }
@@ -167,7 +165,7 @@ class JulHandlerPrintStreamAdapterFactory {
           int startLength = buffer.length();
           buffer.append(decoded);
           if (flush || buffer.indexOf("\n", startLength) >= 0) {
-            msg = flushToString();
+            msg = flushBufferToString();
           }
         }
       }
@@ -218,7 +216,7 @@ class JulHandlerPrintStreamAdapterFactory {
         if (!flush) {
           return;
         }
-        msg = flushToString();
+        msg = flushBufferToString();
       }
       publishIfNonEmpty(msg);
     }
@@ -232,7 +230,7 @@ class JulHandlerPrintStreamAdapterFactory {
         if (!flush) {
           return;
         }
-        msg = flushToString();
+        msg = flushBufferToString();
       }
       publishIfNonEmpty(msg);
     }
@@ -252,7 +250,7 @@ class JulHandlerPrintStreamAdapterFactory {
       String msg;
       synchronized (this) {
         buffer.append(b);
-        msg = flushToString();
+        msg = flushBufferToString();
       }
       publishIfNonEmpty(msg);
     }
@@ -262,7 +260,7 @@ class JulHandlerPrintStreamAdapterFactory {
       String msg;
       synchronized (this) {
         buffer.append(c);
-        msg = flushToString();
+        msg = flushBufferToString();
       }
       publishIfNonEmpty(msg);
     }
@@ -272,7 +270,7 @@ class JulHandlerPrintStreamAdapterFactory {
       String msg;
       synchronized (this) {
         buffer.append(i);
-        msg = flushToString();
+        msg = flushBufferToString();
       }
       publishIfNonEmpty(msg);
     }
@@ -282,7 +280,7 @@ class JulHandlerPrintStreamAdapterFactory {
       String msg;
       synchronized (this) {
         buffer.append(l);
-        msg = flushToString();
+        msg = flushBufferToString();
       }
       publishIfNonEmpty(msg);
     }
@@ -292,7 +290,7 @@ class JulHandlerPrintStreamAdapterFactory {
       String msg;
       synchronized (this) {
         buffer.append(f);
-        msg = flushToString();
+        msg = flushBufferToString();
       }
       publishIfNonEmpty(msg);
     }
@@ -302,7 +300,7 @@ class JulHandlerPrintStreamAdapterFactory {
       String msg;
       synchronized (this) {
         buffer.append(d);
-        msg = flushToString();
+        msg = flushBufferToString();
       }
       publishIfNonEmpty(msg);
     }
@@ -312,7 +310,7 @@ class JulHandlerPrintStreamAdapterFactory {
       String msg;
       synchronized (this) {
         buffer.append(a);
-        msg = flushToString();
+        msg = flushBufferToString();
       }
       publishIfNonEmpty(msg);
     }
@@ -322,7 +320,7 @@ class JulHandlerPrintStreamAdapterFactory {
       String msg;
       synchronized (this) {
         buffer.append(s);
-        msg = flushToString();
+        msg = flushBufferToString();
       }
       publishIfNonEmpty(msg);
     }
@@ -332,7 +330,7 @@ class JulHandlerPrintStreamAdapterFactory {
       String msg;
       synchronized (this) {
         buffer.append(o);
-        msg = flushToString();
+        msg = flushBufferToString();
       }
       publishIfNonEmpty(msg);
     }
@@ -352,7 +350,7 @@ class JulHandlerPrintStreamAdapterFactory {
         if (buffer.indexOf("\n", startLength) < 0) {
           return this;
         }
-        msg = flushToString();
+        msg = flushBufferToString();
       }
       publishIfNonEmpty(msg);
       return this;
@@ -374,17 +372,13 @@ class JulHandlerPrintStreamAdapterFactory {
         if (!flush) {
           return this;
         }
-        msg = flushToString();
+        msg = flushBufferToString();
       }
       publishIfNonEmpty(msg);
       return this;
     }
 
-    // Note to avoid a deadlock, publish may never be called synchronized. See BEAM-9399.
     private void publishIfNonEmpty(String message) {
-      checkState(
-          !Thread.holdsLock(this),
-          "BEAM-9399: publish should not be called with the lock as it may cause deadlock");
       if (message == null || message.isEmpty()) {
         return;
       }
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
index ae0396f..eff5f34 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
@@ -134,6 +134,14 @@ public class JulHandlerPrintStreamAdapterFactoryTest {
   }
 
   @Test
+  public void testLogThrowable() {
+    PrintStream printStream = createPrintStreamAdapter();
+    Throwable t = new RuntimeException("Test error");
+    t.printStackTrace(printStream);
+    assertThat(handler.getLogs(), hasLogItem("testLogThrowable"));
+  }
+
+  @Test
   public void testNoEmptyMessages() {
     try (PrintStream printStream = createPrintStreamAdapter()) {
       printStream.println("blah");