You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/11 19:28:28 UTC

[GitHub] [beam] scwhittle commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

scwhittle commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391210501
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
       }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
 
-      baos.write(b);
-      // Check to see if the next byte matches further into new line string.
-      if (NEW_LINE[matched] == b) {
-        matched += 1;
-        // If we have matched the entire new line, output the contents of the buffer.
-        if (matched == NEW_LINE.length) {
-          output();
-        }
-      } else {
-        // Reset the match
-        matched = 0;
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      // XXX this enforces decoding on boundaries where before it didn't, does that matter?
+      print(new String(a, start, limit, Charset.defaultCharset()));
+    }
+
+    @Override
+    public synchronized void print(boolean b) {
+      buffer.append(b ? "true" : "false");
+    }
+
+    @Override
+    public synchronized void print(char c) {
+      buffer.append(c);
+    }
+
+    @Override
+    public synchronized void print(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public synchronized void print(long l) {
+      buffer.append(l);
+    }
+
+    @Override
+    public synchronized void print(float f) {
+      buffer.append(f);
+    }
+
+    @Override
+    public synchronized void print(double d) {
+      buffer.append(d);
+    }
+
+    @Override
+    public synchronized void print(char[] a) {
+      buffer.append(a);
+    }
+
+    @Override
+    public synchronized void print(String s) {
+      buffer.append(s);
+    }
+
+    @Override
+    public synchronized void print(Object o) {
+      buffer.append(o);
+    }
+
+    @Override
+    public void println() {
+      flush();
+    }
+
+    @Override
+    public void println(boolean b) {
+      String msg;
+      synchronized (this) {
+        buffer.append(b);
+        msg = flushToString();
       }
-      if (baos.size() == BUFFER_LIMIT) {
-        output();
+      publish(msg);
+    }
+
+    @Override
+    public void println(char c) {
+      String msg;
+      synchronized (this) {
+        buffer.append(c);
+        msg = flushToString();
       }
+      publish(msg);
     }
 
     @Override
-    public void flush() throws IOException {
-      output();
+    public void println(int i) {
+      String msg;
+      synchronized (this) {
+        buffer.append(i);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
     @Override
-    public void close() throws IOException {
-      output();
+    public void println(long l) {
+      String msg;
+      synchronized (this) {
+        buffer.append(l);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
-    private void output() {
-      // If nothing was output, do not log anything
-      if (baos.size() == 0) {
-        return;
+    @Override
+    public void println(float f) {
+      String msg;
+      synchronized (this) {
+        buffer.append(f);
+        msg = flushToString();
       }
-      try {
-        String message = baos.toString(StandardCharsets.UTF_8.name());
-        // Strip the new line if it exists
-        if (message.endsWith(System.lineSeparator())) {
-          message = message.substring(0, message.length() - System.lineSeparator().length());
-        }
+      publish(msg);
+    }
+
+    @Override
+    public void println(double d) {
+      String msg;
+      synchronized (this) {
+        buffer.append(d);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(char[] a) {
+      String msg;
+      synchronized (this) {
+        buffer.append(a);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(String s) {
+      String msg;
+      synchronized (this) {
+        buffer.append(s);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
 
-        publish(messageLevel, message);
-      } catch (UnsupportedEncodingException e) {
-        publish(
-            Level.SEVERE, String.format("Unable to decode string output to stdout/stderr %s", e));
+    @Override
+    public void println(Object o) {
+      String msg;
+      synchronized (this) {
+        buffer.append(o);
+        msg = flushToString();
       }
-      matched = 0;
-      baos.reset();
+      publish(msg);
     }
 
-    private void publish(Level level, String message) {
-      if (logger.isLoggable(level)) {
-        LogRecord log = new LogRecord(level, message);
+    @Override
+    public PrintStream format(String format, Object... args) {
+      return format(Locale.getDefault(), format, args);
+    }
+
+    @Override
+    public PrintStream format(Locale locale, String format, Object... args) {
+      String flushed;
+      int newlineIndex;
+      synchronized (this) {
+        int startLength = buffer.length();
+        Formatter formatter = new Formatter(buffer, locale);
+        formatter.format(format, args);
+        newlineIndex = buffer.indexOf("\n", startLength);
+        if (newlineIndex < 0) {
+          return this;
+        }
+        flushed = flushToString();
+      }
+      while (newlineIndex > 0) {
+        publish(flushed.substring(0, newlineIndex));
+        flushed = flushed.substring(newlineIndex + 1);
+        newlineIndex = flushed.indexOf('\n');
+      }
+      publish(flushed);
+      return this;
+    }
+
+    @Override
+    public synchronized PrintStream append(CharSequence cs, int start, int limit) {
+      buffer.append(cs.subSequence(start, limit));
+      return this;
+    }
+
+    // Note to avoid a deadlock, publish may never be called synchronized. See BEAM-9399.
+    private void publish(Level messageLevel, String message) {
+      if (logger.isLoggable(messageLevel)) {
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services