You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/10/02 22:30:21 UTC
[beam] branch master updated: [BEAM-9399] Change
DataflowWorkerLoggingHandler to report errors to the original System.err
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f34ecc3 [BEAM-9399] Change DataflowWorkerLoggingHandler to report errors to the original System.err
new 0113e4b Merge pull request #12825 from scwhittle/logging
f34ecc3 is described below
commit f34ecc3c7d8a197e3539f2ee6dcc22d8792be6fc
Author: Sam Whittle <sa...@google.com>
AuthorDate: Fri Sep 11 04:20:49 2020 -0700
[BEAM-9399] Change DataflowWorkerLoggingHandler to report errors to the original System.err
Currently such errors are logged to System.err which is a PrintStream that
publishes to the handler. This is perhaps unlikely to work if earlier publishing
failed and additionally removes a potential deadlock between the PrintStream
object sychronization and the Handler object synchronization. This was attempted
to be fixed earlier by dissallowing the PrintStream object to be synchronized
when calling into the handler. However this is possible to be triggered by
external synchronization on the PrintStream, such as that performed by
Throwable.printStackTrace. Changing the PrintStream to use separate synchronization
for buffering works in most cases but not for cases where the stream is externally
synchronized.
---
.../logging/DataflowWorkerLoggingInitializer.java | 33 +++++++++++++++++++
.../JulHandlerPrintStreamAdapterFactory.java | 38 +++++++++-------------
.../JulHandlerPrintStreamAdapterFactoryTest.java | 8 +++++
3 files changed, 57 insertions(+), 22 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
index 5668847..ebef8ea 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.io.PrintStream;
import java.util.List;
import java.util.Map;
+import java.util.logging.ErrorManager;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;
@@ -98,6 +99,35 @@ public class DataflowWorkerLoggingInitializer {
private static PrintStream originalStdErr = System.err;
private static boolean initialized = false;
+ // This is the same as ErrorManager except that it uses the provided
+ // print stream.
+ public static class PrintStreamErrorManager extends ErrorManager {
+ public PrintStreamErrorManager(PrintStream stream) {
+ this.stream = stream;
+ }
+
+ private PrintStream stream;
+ private boolean reported = false;
+
+ @Override
+ public synchronized void error(String msg, Exception ex, int code) {
+ if (reported) {
+ // We only report the first error, to avoid clogging
+ // the screen.
+ return;
+ }
+ reported = true;
+ String text = "java.util.logging.ErrorManager: " + code;
+ if (msg != null) {
+ text = text + ": " + msg;
+ }
+ stream.println(text);
+ if (ex != null) {
+ ex.printStackTrace(stream);
+ }
+ }
+ };
+
private static DataflowWorkerLoggingHandler makeLoggingHandler(
String filepathProperty, String defaultFilePath) throws IOException {
String filepath = System.getProperty(filepathProperty, defaultFilePath);
@@ -105,6 +135,9 @@ public class DataflowWorkerLoggingInitializer {
DataflowWorkerLoggingHandler handler =
new DataflowWorkerLoggingHandler(filepath, filesizeMb * 1024L * 1024L);
handler.setLevel(Level.ALL);
+ // To avoid potential deadlock between the handler and the System.err print stream, use the
+ // original stderr print stream for errors. See BEAM-9399.
+ handler.setErrorManager(new PrintStreamErrorManager(getOriginalStdErr()));
return handler;
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
index 08a995d..79ef587 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.dataflow.worker.logging;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
-
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
@@ -96,10 +94,10 @@ class JulHandlerPrintStreamAdapterFactory {
@Override
public void flush() {
- publishIfNonEmpty(flushToString());
+ publishIfNonEmpty(flushBufferToString());
}
- private synchronized String flushToString() {
+ private synchronized String flushBufferToString() {
if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
buffer.setLength(buffer.length() - 1);
}
@@ -167,7 +165,7 @@ class JulHandlerPrintStreamAdapterFactory {
int startLength = buffer.length();
buffer.append(decoded);
if (flush || buffer.indexOf("\n", startLength) >= 0) {
- msg = flushToString();
+ msg = flushBufferToString();
}
}
}
@@ -218,7 +216,7 @@ class JulHandlerPrintStreamAdapterFactory {
if (!flush) {
return;
}
- msg = flushToString();
+ msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
@@ -232,7 +230,7 @@ class JulHandlerPrintStreamAdapterFactory {
if (!flush) {
return;
}
- msg = flushToString();
+ msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
@@ -252,7 +250,7 @@ class JulHandlerPrintStreamAdapterFactory {
String msg;
synchronized (this) {
buffer.append(b);
- msg = flushToString();
+ msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
@@ -262,7 +260,7 @@ class JulHandlerPrintStreamAdapterFactory {
String msg;
synchronized (this) {
buffer.append(c);
- msg = flushToString();
+ msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
@@ -272,7 +270,7 @@ class JulHandlerPrintStreamAdapterFactory {
String msg;
synchronized (this) {
buffer.append(i);
- msg = flushToString();
+ msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
@@ -282,7 +280,7 @@ class JulHandlerPrintStreamAdapterFactory {
String msg;
synchronized (this) {
buffer.append(l);
- msg = flushToString();
+ msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
@@ -292,7 +290,7 @@ class JulHandlerPrintStreamAdapterFactory {
String msg;
synchronized (this) {
buffer.append(f);
- msg = flushToString();
+ msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
@@ -302,7 +300,7 @@ class JulHandlerPrintStreamAdapterFactory {
String msg;
synchronized (this) {
buffer.append(d);
- msg = flushToString();
+ msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
@@ -312,7 +310,7 @@ class JulHandlerPrintStreamAdapterFactory {
String msg;
synchronized (this) {
buffer.append(a);
- msg = flushToString();
+ msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
@@ -322,7 +320,7 @@ class JulHandlerPrintStreamAdapterFactory {
String msg;
synchronized (this) {
buffer.append(s);
- msg = flushToString();
+ msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
@@ -332,7 +330,7 @@ class JulHandlerPrintStreamAdapterFactory {
String msg;
synchronized (this) {
buffer.append(o);
- msg = flushToString();
+ msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
@@ -352,7 +350,7 @@ class JulHandlerPrintStreamAdapterFactory {
if (buffer.indexOf("\n", startLength) < 0) {
return this;
}
- msg = flushToString();
+ msg = flushBufferToString();
}
publishIfNonEmpty(msg);
return this;
@@ -374,17 +372,13 @@ class JulHandlerPrintStreamAdapterFactory {
if (!flush) {
return this;
}
- msg = flushToString();
+ msg = flushBufferToString();
}
publishIfNonEmpty(msg);
return this;
}
- // Note to avoid a deadlock, publish may never be called synchronized. See BEAM-9399.
private void publishIfNonEmpty(String message) {
- checkState(
- !Thread.holdsLock(this),
- "BEAM-9399: publish should not be called with the lock as it may cause deadlock");
if (message == null || message.isEmpty()) {
return;
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
index ae0396f..eff5f34 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
@@ -134,6 +134,14 @@ public class JulHandlerPrintStreamAdapterFactoryTest {
}
@Test
+ public void testLogThrowable() {
+ PrintStream printStream = createPrintStreamAdapter();
+ Throwable t = new RuntimeException("Test error");
+ t.printStackTrace(printStream);
+ assertThat(handler.getLogs(), hasLogItem("testLogThrowable"));
+ }
+
+ @Test
public void testNoEmptyMessages() {
try (PrintStream printStream = createPrintStreamAdapter()) {
printStream.println("blah");