You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/11 00:14:18 UTC

[GitHub] [beam] scwhittle opened a new pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

scwhittle opened a new pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096
 
 
   This replaces the current implementation of a custom output stream wrapped by standard PrintStream, removing a possible deadlock between the PrintStream and handler.  The
   deadlock can occur if something beneath the handler lock attempts to use System.err, reversing
   the normal locking order.
   
   By using PrintStream we can use a StringBuffer instead of a ByteBuffer, which also avoids extra encoding and decoding between byte arrays and Strings.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] scwhittle commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
scwhittle commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#issuecomment-606756154
 
 
   Fixed. Sorry for that, I ran tests but must have been on the wrong branch.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391071339
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
       }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
 
-      baos.write(b);
-      // Check to see if the next byte matches further into new line string.
-      if (NEW_LINE[matched] == b) {
-        matched += 1;
-        // If we have matched the entire new line, output the contents of the buffer.
-        if (matched == NEW_LINE.length) {
-          output();
-        }
-      } else {
-        // Reset the match
-        matched = 0;
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      // XXX this enforces decoding on boundaries where before it didn't, does that matter?
 
 Review comment:
   Would it be an issue for multi-byte wide characters that have been split?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#issuecomment-606717579
 
 
   > I pushed fixes for both changes you requested but it isn't letting me close your changes requested . For the future, should push commits and only squash them once reviewing is complete? Thanks!
   
   You shouldn't squash commits because it makes it harder for the reviewer to see the diff between versions. Also it messes up parts of the comments/suggestion history. Once you get an LGTM, you can either squash and fix-up your commit history then or allow the reviewer to squash and merge your commits.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik merged pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391071339
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
       }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
 
-      baos.write(b);
-      // Check to see if the next byte matches further into new line string.
-      if (NEW_LINE[matched] == b) {
-        matched += 1;
-        // If we have matched the entire new line, output the contents of the buffer.
-        if (matched == NEW_LINE.length) {
-          output();
-        }
-      } else {
-        // Reset the match
-        matched = 0;
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      // XXX this enforces decoding on boundaries where before it didn't, does that matter?
 
 Review comment:
   It would be an issue for multi-byte wide characters.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391086037
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
 
 Review comment:
   It makes a lot more sense to move all the newline handling into `publish` and for it to check for new lines. If you want to avoid the indexOf call, you could create an internal method that `publishWithNewLines` that does all the substring/indexOf work

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391929052
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +43,328 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
-    private Logger logger;
+    private final Logger logger;
 
-    private Handler handler;
-    private String loggerName;
-    private ByteArrayOutputStream baos;
-    private Level messageLevel;
-    private int matched = 0;
+    private final Handler handler;
+    private final String loggerName;
+    private final StringBuilder buffer;
+    private final Level messageLevel;
+    private final CharsetDecoder decoder;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
+      this.decoder = Charset.defaultCharset().newDecoder();
+    }
+
+    @Override
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
+      }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
+
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      CharBuffer decoded;
+      synchronized (decoder) {
+        try {
+          decoded = decoder.decode(ByteBuffer.wrap(a, start, limit));
 
 Review comment:
   You need to use the stateful version of the CharsetDecoder.decode since this method assumes that the byte[] range represents a whole valid String.
   
   In the other methods you would need to finish the decoding if there is a partial decoding in flight.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#issuecomment-606690268
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] scwhittle commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
scwhittle commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#issuecomment-606294220
 
 
   I pushed fixes for both changes you requested but it isn't letting me close your changes requested .  For the future, should push commits and only squash them once reviewing is complete?  Thanks!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#issuecomment-606839742
 
 
   https://builds.apache.org/job/beam_PreCommit_Java_Commit/10616/ passed, merging.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391070326
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
 
 Review comment:
   Why do you think the newline should be stripped?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391086037
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
 
 Review comment:
   It makes a lot more sense to move all the newline handling into `publish` and for it to check for new lines. If you want to avoid the cost of indexOf/substring call, you could create an internal method that `publishWithNewLines` that does all the substring/indexOf work and only use publish for the trivial println methods

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391062529
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
       }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
 
-      baos.write(b);
-      // Check to see if the next byte matches further into new line string.
-      if (NEW_LINE[matched] == b) {
-        matched += 1;
-        // If we have matched the entire new line, output the contents of the buffer.
-        if (matched == NEW_LINE.length) {
-          output();
-        }
-      } else {
-        // Reset the match
-        matched = 0;
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      // XXX this enforces decoding on boundaries where before it didn't, does that matter?
+      print(new String(a, start, limit, Charset.defaultCharset()));
+    }
+
+    @Override
+    public synchronized void print(boolean b) {
+      buffer.append(b ? "true" : "false");
+    }
+
+    @Override
+    public synchronized void print(char c) {
+      buffer.append(c);
+    }
+
+    @Override
+    public synchronized void print(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public synchronized void print(long l) {
+      buffer.append(l);
+    }
+
+    @Override
+    public synchronized void print(float f) {
+      buffer.append(f);
+    }
+
+    @Override
+    public synchronized void print(double d) {
+      buffer.append(d);
+    }
+
+    @Override
+    public synchronized void print(char[] a) {
+      buffer.append(a);
+    }
+
+    @Override
+    public synchronized void print(String s) {
+      buffer.append(s);
+    }
+
+    @Override
+    public synchronized void print(Object o) {
+      buffer.append(o);
+    }
+
+    @Override
+    public void println() {
+      flush();
+    }
+
+    @Override
+    public void println(boolean b) {
+      String msg;
+      synchronized (this) {
+        buffer.append(b);
+        msg = flushToString();
       }
-      if (baos.size() == BUFFER_LIMIT) {
-        output();
+      publish(msg);
+    }
+
+    @Override
+    public void println(char c) {
+      String msg;
+      synchronized (this) {
+        buffer.append(c);
+        msg = flushToString();
       }
+      publish(msg);
     }
 
     @Override
-    public void flush() throws IOException {
-      output();
+    public void println(int i) {
+      String msg;
+      synchronized (this) {
+        buffer.append(i);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
     @Override
-    public void close() throws IOException {
-      output();
+    public void println(long l) {
+      String msg;
+      synchronized (this) {
+        buffer.append(l);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
-    private void output() {
-      // If nothing was output, do not log anything
-      if (baos.size() == 0) {
-        return;
+    @Override
+    public void println(float f) {
+      String msg;
+      synchronized (this) {
+        buffer.append(f);
+        msg = flushToString();
       }
-      try {
-        String message = baos.toString(StandardCharsets.UTF_8.name());
-        // Strip the new line if it exists
-        if (message.endsWith(System.lineSeparator())) {
-          message = message.substring(0, message.length() - System.lineSeparator().length());
-        }
+      publish(msg);
+    }
+
+    @Override
+    public void println(double d) {
+      String msg;
+      synchronized (this) {
+        buffer.append(d);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(char[] a) {
+      String msg;
+      synchronized (this) {
+        buffer.append(a);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(String s) {
+      String msg;
+      synchronized (this) {
+        buffer.append(s);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
 
-        publish(messageLevel, message);
-      } catch (UnsupportedEncodingException e) {
-        publish(
-            Level.SEVERE, String.format("Unable to decode string output to stdout/stderr %s", e));
+    @Override
+    public void println(Object o) {
+      String msg;
+      synchronized (this) {
+        buffer.append(o);
+        msg = flushToString();
       }
-      matched = 0;
-      baos.reset();
+      publish(msg);
     }
 
-    private void publish(Level level, String message) {
-      if (logger.isLoggable(level)) {
-        LogRecord log = new LogRecord(level, message);
+    @Override
+    public PrintStream format(String format, Object... args) {
+      return format(Locale.getDefault(), format, args);
+    }
+
+    @Override
+    public PrintStream format(Locale locale, String format, Object... args) {
+      String flushed;
+      int newlineIndex;
+      synchronized (this) {
+        int startLength = buffer.length();
+        Formatter formatter = new Formatter(buffer, locale);
+        formatter.format(format, args);
+        newlineIndex = buffer.indexOf("\n", startLength);
+        if (newlineIndex < 0) {
+          return this;
+        }
+        flushed = flushToString();
+      }
+      while (newlineIndex > 0) {
 
 Review comment:
   Flushing is only done if there is a `\n` in the entire String, doesn't need to be done per `\n`
   
   https://github.com/openjdk-mirror/jdk7u-jdk/blob/f4d80957e89a19a29bb9f9807d2a28351ed7f7df/src/share/classes/java/io/PrintStream.java#L528

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] scwhittle commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391210501
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
       }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
 
-      baos.write(b);
-      // Check to see if the next byte matches further into new line string.
-      if (NEW_LINE[matched] == b) {
-        matched += 1;
-        // If we have matched the entire new line, output the contents of the buffer.
-        if (matched == NEW_LINE.length) {
-          output();
-        }
-      } else {
-        // Reset the match
-        matched = 0;
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      // XXX this enforces decoding on boundaries where before it didn't, does that matter?
+      print(new String(a, start, limit, Charset.defaultCharset()));
+    }
+
+    @Override
+    public synchronized void print(boolean b) {
+      buffer.append(b ? "true" : "false");
+    }
+
+    @Override
+    public synchronized void print(char c) {
+      buffer.append(c);
+    }
+
+    @Override
+    public synchronized void print(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public synchronized void print(long l) {
+      buffer.append(l);
+    }
+
+    @Override
+    public synchronized void print(float f) {
+      buffer.append(f);
+    }
+
+    @Override
+    public synchronized void print(double d) {
+      buffer.append(d);
+    }
+
+    @Override
+    public synchronized void print(char[] a) {
+      buffer.append(a);
+    }
+
+    @Override
+    public synchronized void print(String s) {
+      buffer.append(s);
+    }
+
+    @Override
+    public synchronized void print(Object o) {
+      buffer.append(o);
+    }
+
+    @Override
+    public void println() {
+      flush();
+    }
+
+    @Override
+    public void println(boolean b) {
+      String msg;
+      synchronized (this) {
+        buffer.append(b);
+        msg = flushToString();
       }
-      if (baos.size() == BUFFER_LIMIT) {
-        output();
+      publish(msg);
+    }
+
+    @Override
+    public void println(char c) {
+      String msg;
+      synchronized (this) {
+        buffer.append(c);
+        msg = flushToString();
       }
+      publish(msg);
     }
 
     @Override
-    public void flush() throws IOException {
-      output();
+    public void println(int i) {
+      String msg;
+      synchronized (this) {
+        buffer.append(i);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
     @Override
-    public void close() throws IOException {
-      output();
+    public void println(long l) {
+      String msg;
+      synchronized (this) {
+        buffer.append(l);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
-    private void output() {
-      // If nothing was output, do not log anything
-      if (baos.size() == 0) {
-        return;
+    @Override
+    public void println(float f) {
+      String msg;
+      synchronized (this) {
+        buffer.append(f);
+        msg = flushToString();
       }
-      try {
-        String message = baos.toString(StandardCharsets.UTF_8.name());
-        // Strip the new line if it exists
-        if (message.endsWith(System.lineSeparator())) {
-          message = message.substring(0, message.length() - System.lineSeparator().length());
-        }
+      publish(msg);
+    }
+
+    @Override
+    public void println(double d) {
+      String msg;
+      synchronized (this) {
+        buffer.append(d);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(char[] a) {
+      String msg;
+      synchronized (this) {
+        buffer.append(a);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(String s) {
+      String msg;
+      synchronized (this) {
+        buffer.append(s);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
 
-        publish(messageLevel, message);
-      } catch (UnsupportedEncodingException e) {
-        publish(
-            Level.SEVERE, String.format("Unable to decode string output to stdout/stderr %s", e));
+    @Override
+    public void println(Object o) {
+      String msg;
+      synchronized (this) {
+        buffer.append(o);
+        msg = flushToString();
       }
-      matched = 0;
-      baos.reset();
+      publish(msg);
     }
 
-    private void publish(Level level, String message) {
-      if (logger.isLoggable(level)) {
-        LogRecord log = new LogRecord(level, message);
+    @Override
+    public PrintStream format(String format, Object... args) {
+      return format(Locale.getDefault(), format, args);
+    }
+
+    @Override
+    public PrintStream format(Locale locale, String format, Object... args) {
+      String flushed;
+      int newlineIndex;
+      synchronized (this) {
+        int startLength = buffer.length();
+        Formatter formatter = new Formatter(buffer, locale);
+        formatter.format(format, args);
+        newlineIndex = buffer.indexOf("\n", startLength);
+        if (newlineIndex < 0) {
+          return this;
+        }
+        flushed = flushToString();
+      }
+      while (newlineIndex > 0) {
+        publish(flushed.substring(0, newlineIndex));
+        flushed = flushed.substring(newlineIndex + 1);
+        newlineIndex = flushed.indexOf('\n');
+      }
+      publish(flushed);
+      return this;
+    }
+
+    @Override
+    public synchronized PrintStream append(CharSequence cs, int start, int limit) {
+      buffer.append(cs.subSequence(start, limit));
+      return this;
+    }
+
+    // Note to avoid a deadlock, publish may never be called synchronized. See BEAM-9399.
+    private void publish(Level messageLevel, String message) {
+      if (logger.isLoggable(messageLevel)) {
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] scwhittle commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
scwhittle commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#issuecomment-597826641
 
 
   > Thanks for the detailed pull request description. I was able to understand the reason for this and actually able to do a review. Is there a way to test this? (not the deadlock or lack thereof but just the basic functionality)
   There is an existing test for this JulHandlerPrintStreamAdapterFactoryTest. Let me know if there are additions you would like there.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#issuecomment-606785621
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] scwhittle commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391210676
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
       }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
 
-      baos.write(b);
-      // Check to see if the next byte matches further into new line string.
-      if (NEW_LINE[matched] == b) {
-        matched += 1;
-        // If we have matched the entire new line, output the contents of the buffer.
-        if (matched == NEW_LINE.length) {
-          output();
-        }
-      } else {
-        // Reset the match
-        matched = 0;
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      // XXX this enforces decoding on boundaries where before it didn't, does that matter?
 
 Review comment:
   Added a CharsetDecoder that is used for this method

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#issuecomment-606757069
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] scwhittle commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391212343
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
 
 Review comment:
   The previous code removed trailing newlines when publishing, which makes sense to me since we don't need them in the final log message.
   Is there a reason not to do it here? This is a single callsite and avoids making a substring.
   
   The checking for newlines for flushing seems like it needs to be in each function since otherwise we wouldn't be calling publish.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391062529
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
       }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
 
-      baos.write(b);
-      // Check to see if the next byte matches further into new line string.
-      if (NEW_LINE[matched] == b) {
-        matched += 1;
-        // If we have matched the entire new line, output the contents of the buffer.
-        if (matched == NEW_LINE.length) {
-          output();
-        }
-      } else {
-        // Reset the match
-        matched = 0;
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      // XXX this enforces decoding on boundaries where before it didn't, does that matter?
+      print(new String(a, start, limit, Charset.defaultCharset()));
+    }
+
+    @Override
+    public synchronized void print(boolean b) {
+      buffer.append(b ? "true" : "false");
+    }
+
+    @Override
+    public synchronized void print(char c) {
+      buffer.append(c);
+    }
+
+    @Override
+    public synchronized void print(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public synchronized void print(long l) {
+      buffer.append(l);
+    }
+
+    @Override
+    public synchronized void print(float f) {
+      buffer.append(f);
+    }
+
+    @Override
+    public synchronized void print(double d) {
+      buffer.append(d);
+    }
+
+    @Override
+    public synchronized void print(char[] a) {
+      buffer.append(a);
+    }
+
+    @Override
+    public synchronized void print(String s) {
+      buffer.append(s);
+    }
+
+    @Override
+    public synchronized void print(Object o) {
+      buffer.append(o);
+    }
+
+    @Override
+    public void println() {
+      flush();
+    }
+
+    @Override
+    public void println(boolean b) {
+      String msg;
+      synchronized (this) {
+        buffer.append(b);
+        msg = flushToString();
       }
-      if (baos.size() == BUFFER_LIMIT) {
-        output();
+      publish(msg);
+    }
+
+    @Override
+    public void println(char c) {
+      String msg;
+      synchronized (this) {
+        buffer.append(c);
+        msg = flushToString();
       }
+      publish(msg);
     }
 
     @Override
-    public void flush() throws IOException {
-      output();
+    public void println(int i) {
+      String msg;
+      synchronized (this) {
+        buffer.append(i);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
     @Override
-    public void close() throws IOException {
-      output();
+    public void println(long l) {
+      String msg;
+      synchronized (this) {
+        buffer.append(l);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
-    private void output() {
-      // If nothing was output, do not log anything
-      if (baos.size() == 0) {
-        return;
+    @Override
+    public void println(float f) {
+      String msg;
+      synchronized (this) {
+        buffer.append(f);
+        msg = flushToString();
       }
-      try {
-        String message = baos.toString(StandardCharsets.UTF_8.name());
-        // Strip the new line if it exists
-        if (message.endsWith(System.lineSeparator())) {
-          message = message.substring(0, message.length() - System.lineSeparator().length());
-        }
+      publish(msg);
+    }
+
+    @Override
+    public void println(double d) {
+      String msg;
+      synchronized (this) {
+        buffer.append(d);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(char[] a) {
+      String msg;
+      synchronized (this) {
+        buffer.append(a);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(String s) {
+      String msg;
+      synchronized (this) {
+        buffer.append(s);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
 
-        publish(messageLevel, message);
-      } catch (UnsupportedEncodingException e) {
-        publish(
-            Level.SEVERE, String.format("Unable to decode string output to stdout/stderr %s", e));
+    @Override
+    public void println(Object o) {
+      String msg;
+      synchronized (this) {
+        buffer.append(o);
+        msg = flushToString();
       }
-      matched = 0;
-      baos.reset();
+      publish(msg);
     }
 
-    private void publish(Level level, String message) {
-      if (logger.isLoggable(level)) {
-        LogRecord log = new LogRecord(level, message);
+    @Override
+    public PrintStream format(String format, Object... args) {
+      return format(Locale.getDefault(), format, args);
+    }
+
+    @Override
+    public PrintStream format(Locale locale, String format, Object... args) {
+      String flushed;
+      int newlineIndex;
+      synchronized (this) {
+        int startLength = buffer.length();
+        Formatter formatter = new Formatter(buffer, locale);
+        formatter.format(format, args);
+        newlineIndex = buffer.indexOf("\n", startLength);
+        if (newlineIndex < 0) {
+          return this;
+        }
+        flushed = flushToString();
+      }
+      while (newlineIndex > 0) {
 
 Review comment:
   Flushing is only done if there is a `\n` in the entire String, doesn't need to be done per `\n`
   
   https://github.com/openjdk-mirror/jdk7u-jdk/blob/f4d80957e89a19a29bb9f9807d2a28351ed7f7df/src/share/classes/java/io/PrintStream.java#L528

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391075262
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
       }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
 
-      baos.write(b);
-      // Check to see if the next byte matches further into new line string.
-      if (NEW_LINE[matched] == b) {
-        matched += 1;
-        // If we have matched the entire new line, output the contents of the buffer.
-        if (matched == NEW_LINE.length) {
-          output();
-        }
-      } else {
-        // Reset the match
-        matched = 0;
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      // XXX this enforces decoding on boundaries where before it didn't, does that matter?
+      print(new String(a, start, limit, Charset.defaultCharset()));
+    }
+
+    @Override
+    public synchronized void print(boolean b) {
+      buffer.append(b ? "true" : "false");
+    }
+
+    @Override
+    public synchronized void print(char c) {
+      buffer.append(c);
+    }
+
+    @Override
+    public synchronized void print(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public synchronized void print(long l) {
+      buffer.append(l);
+    }
+
+    @Override
+    public synchronized void print(float f) {
+      buffer.append(f);
+    }
+
+    @Override
+    public synchronized void print(double d) {
+      buffer.append(d);
+    }
+
+    @Override
+    public synchronized void print(char[] a) {
+      buffer.append(a);
+    }
+
+    @Override
+    public synchronized void print(String s) {
+      buffer.append(s);
+    }
+
+    @Override
+    public synchronized void print(Object o) {
+      buffer.append(o);
+    }
+
+    @Override
+    public void println() {
+      flush();
+    }
+
+    @Override
+    public void println(boolean b) {
+      String msg;
+      synchronized (this) {
+        buffer.append(b);
+        msg = flushToString();
       }
-      if (baos.size() == BUFFER_LIMIT) {
-        output();
+      publish(msg);
+    }
+
+    @Override
+    public void println(char c) {
+      String msg;
+      synchronized (this) {
+        buffer.append(c);
+        msg = flushToString();
       }
+      publish(msg);
     }
 
     @Override
-    public void flush() throws IOException {
-      output();
+    public void println(int i) {
+      String msg;
+      synchronized (this) {
+        buffer.append(i);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
     @Override
-    public void close() throws IOException {
-      output();
+    public void println(long l) {
+      String msg;
+      synchronized (this) {
+        buffer.append(l);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
-    private void output() {
-      // If nothing was output, do not log anything
-      if (baos.size() == 0) {
-        return;
+    @Override
+    public void println(float f) {
+      String msg;
+      synchronized (this) {
+        buffer.append(f);
+        msg = flushToString();
       }
-      try {
-        String message = baos.toString(StandardCharsets.UTF_8.name());
-        // Strip the new line if it exists
-        if (message.endsWith(System.lineSeparator())) {
-          message = message.substring(0, message.length() - System.lineSeparator().length());
-        }
+      publish(msg);
+    }
+
+    @Override
+    public void println(double d) {
+      String msg;
+      synchronized (this) {
+        buffer.append(d);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(char[] a) {
+      String msg;
+      synchronized (this) {
+        buffer.append(a);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(String s) {
+      String msg;
+      synchronized (this) {
+        buffer.append(s);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
 
-        publish(messageLevel, message);
-      } catch (UnsupportedEncodingException e) {
-        publish(
-            Level.SEVERE, String.format("Unable to decode string output to stdout/stderr %s", e));
+    @Override
+    public void println(Object o) {
+      String msg;
+      synchronized (this) {
+        buffer.append(o);
+        msg = flushToString();
       }
-      matched = 0;
-      baos.reset();
+      publish(msg);
     }
 
-    private void publish(Level level, String message) {
-      if (logger.isLoggable(level)) {
-        LogRecord log = new LogRecord(level, message);
+    @Override
+    public PrintStream format(String format, Object... args) {
+      return format(Locale.getDefault(), format, args);
+    }
+
+    @Override
+    public PrintStream format(Locale locale, String format, Object... args) {
+      String flushed;
+      int newlineIndex;
+      synchronized (this) {
+        int startLength = buffer.length();
+        Formatter formatter = new Formatter(buffer, locale);
+        formatter.format(format, args);
+        newlineIndex = buffer.indexOf("\n", startLength);
+        if (newlineIndex < 0) {
+          return this;
+        }
+        flushed = flushToString();
+      }
+      while (newlineIndex > 0) {
+        publish(flushed.substring(0, newlineIndex));
+        flushed = flushed.substring(newlineIndex + 1);
+        newlineIndex = flushed.indexOf('\n');
+      }
+      publish(flushed);
+      return this;
+    }
+
+    @Override
+    public synchronized PrintStream append(CharSequence cs, int start, int limit) {
+      buffer.append(cs.subSequence(start, limit));
+      return this;
+    }
+
+    // Note to avoid a deadlock, publish may never be called synchronized. See BEAM-9399.
+    private void publish(Level messageLevel, String message) {
+      if (logger.isLoggable(messageLevel)) {
 
 Review comment:
   Either use `assert !Thread.holdsLock(this) : "BEAM-9399: This thread should never hold this lock";` or `checkState(!Thread.holdsLock(this), "BEAM-9399: This thread should never hold this lock");` to guard against this case.
   
   https://docs.oracle.com/javase/7/docs/api/java/lang/Thread.html#holdsLock%28java.lang.Object%29

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] scwhittle commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r400541708
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
 ##########
 @@ -115,6 +116,22 @@ public void testLogOnClose() {
     assertThat(handler.getLogs(), hasLogItem("blah"));
   }
 
+  @Test
+  public void testLogRawBytes() {
+    PrintStream printStream = createPrintStreamAdapter();
+    String msg = "♠ ♡ ♢ ♣ ♤ ♥ ♦ ♧";
+    byte[] bytes = msg.getBytes(UTF_8);
 
 Review comment:
   good catch, done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391070326
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
 
 Review comment:
   Why do you think the newline should be stripped?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#issuecomment-606684667
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391075262
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
       }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
 
-      baos.write(b);
-      // Check to see if the next byte matches further into new line string.
-      if (NEW_LINE[matched] == b) {
-        matched += 1;
-        // If we have matched the entire new line, output the contents of the buffer.
-        if (matched == NEW_LINE.length) {
-          output();
-        }
-      } else {
-        // Reset the match
-        matched = 0;
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      // XXX this enforces decoding on boundaries where before it didn't, does that matter?
+      print(new String(a, start, limit, Charset.defaultCharset()));
+    }
+
+    @Override
+    public synchronized void print(boolean b) {
+      buffer.append(b ? "true" : "false");
+    }
+
+    @Override
+    public synchronized void print(char c) {
+      buffer.append(c);
+    }
+
+    @Override
+    public synchronized void print(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public synchronized void print(long l) {
+      buffer.append(l);
+    }
+
+    @Override
+    public synchronized void print(float f) {
+      buffer.append(f);
+    }
+
+    @Override
+    public synchronized void print(double d) {
+      buffer.append(d);
+    }
+
+    @Override
+    public synchronized void print(char[] a) {
+      buffer.append(a);
+    }
+
+    @Override
+    public synchronized void print(String s) {
+      buffer.append(s);
+    }
+
+    @Override
+    public synchronized void print(Object o) {
+      buffer.append(o);
+    }
+
+    @Override
+    public void println() {
+      flush();
+    }
+
+    @Override
+    public void println(boolean b) {
+      String msg;
+      synchronized (this) {
+        buffer.append(b);
+        msg = flushToString();
       }
-      if (baos.size() == BUFFER_LIMIT) {
-        output();
+      publish(msg);
+    }
+
+    @Override
+    public void println(char c) {
+      String msg;
+      synchronized (this) {
+        buffer.append(c);
+        msg = flushToString();
       }
+      publish(msg);
     }
 
     @Override
-    public void flush() throws IOException {
-      output();
+    public void println(int i) {
+      String msg;
+      synchronized (this) {
+        buffer.append(i);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
     @Override
-    public void close() throws IOException {
-      output();
+    public void println(long l) {
+      String msg;
+      synchronized (this) {
+        buffer.append(l);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
-    private void output() {
-      // If nothing was output, do not log anything
-      if (baos.size() == 0) {
-        return;
+    @Override
+    public void println(float f) {
+      String msg;
+      synchronized (this) {
+        buffer.append(f);
+        msg = flushToString();
       }
-      try {
-        String message = baos.toString(StandardCharsets.UTF_8.name());
-        // Strip the new line if it exists
-        if (message.endsWith(System.lineSeparator())) {
-          message = message.substring(0, message.length() - System.lineSeparator().length());
-        }
+      publish(msg);
+    }
+
+    @Override
+    public void println(double d) {
+      String msg;
+      synchronized (this) {
+        buffer.append(d);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(char[] a) {
+      String msg;
+      synchronized (this) {
+        buffer.append(a);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(String s) {
+      String msg;
+      synchronized (this) {
+        buffer.append(s);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
 
-        publish(messageLevel, message);
-      } catch (UnsupportedEncodingException e) {
-        publish(
-            Level.SEVERE, String.format("Unable to decode string output to stdout/stderr %s", e));
+    @Override
+    public void println(Object o) {
+      String msg;
+      synchronized (this) {
+        buffer.append(o);
+        msg = flushToString();
       }
-      matched = 0;
-      baos.reset();
+      publish(msg);
     }
 
-    private void publish(Level level, String message) {
-      if (logger.isLoggable(level)) {
-        LogRecord log = new LogRecord(level, message);
+    @Override
+    public PrintStream format(String format, Object... args) {
+      return format(Locale.getDefault(), format, args);
+    }
+
+    @Override
+    public PrintStream format(Locale locale, String format, Object... args) {
+      String flushed;
+      int newlineIndex;
+      synchronized (this) {
+        int startLength = buffer.length();
+        Formatter formatter = new Formatter(buffer, locale);
+        formatter.format(format, args);
+        newlineIndex = buffer.indexOf("\n", startLength);
+        if (newlineIndex < 0) {
+          return this;
+        }
+        flushed = flushToString();
+      }
+      while (newlineIndex > 0) {
+        publish(flushed.substring(0, newlineIndex));
+        flushed = flushed.substring(newlineIndex + 1);
+        newlineIndex = flushed.indexOf('\n');
+      }
+      publish(flushed);
+      return this;
+    }
+
+    @Override
+    public synchronized PrintStream append(CharSequence cs, int start, int limit) {
+      buffer.append(cs.subSequence(start, limit));
+      return this;
+    }
+
+    // Note to avoid a deadlock, publish may never be called synchronized. See BEAM-9399.
+    private void publish(Level messageLevel, String message) {
+      if (logger.isLoggable(messageLevel)) {
 
 Review comment:
   Either use `assert !Thread.holdsLock(this);` or `checkState(!Thread.holdsLock(this), "This thread should never hold this lock");` to guard against this case.
   
   https://docs.oracle.com/javase/7/docs/api/java/lang/Thread.html#holdsLock%28java.lang.Object%29

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391929052
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +43,328 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
-    private Logger logger;
+    private final Logger logger;
 
-    private Handler handler;
-    private String loggerName;
-    private ByteArrayOutputStream baos;
-    private Level messageLevel;
-    private int matched = 0;
+    private final Handler handler;
+    private final String loggerName;
+    private final StringBuilder buffer;
+    private final Level messageLevel;
+    private final CharsetDecoder decoder;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
+      this.decoder = Charset.defaultCharset().newDecoder();
+    }
+
+    @Override
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
+      }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
+
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      CharBuffer decoded;
+      synchronized (decoder) {
+        try {
+          decoded = decoder.decode(ByteBuffer.wrap(a, start, limit));
 
 Review comment:
   You need to use the stateful version of the [CharsetDecoder.decode](https://docs.oracle.com/javase/7/docs/api/java/nio/charset/CharsetDecoder.html#decode(java.nio.ByteBuffer,%20java.nio.CharBuffer,%20boolean)) since this method assumes that the byte[] range represents a whole valid String.
   
   In the other methods you would need to finish the decoding if there is a partial decoding in flight.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391086037
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
 
 Review comment:
   If your careful in the above methods that append strings to the buffer to never append the '\n' you will never need this check.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#issuecomment-606248141
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aaltay commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#issuecomment-604781282
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] scwhittle commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r395335810
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +43,328 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
-    private Logger logger;
+    private final Logger logger;
 
-    private Handler handler;
-    private String loggerName;
-    private ByteArrayOutputStream baos;
-    private Level messageLevel;
-    private int matched = 0;
+    private final Handler handler;
+    private final String loggerName;
+    private final StringBuilder buffer;
+    private final Level messageLevel;
+    private final CharsetDecoder decoder;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
+      this.decoder = Charset.defaultCharset().newDecoder();
+    }
+
+    @Override
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
+      }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
+
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      CharBuffer decoded;
+      synchronized (decoder) {
+        try {
+          decoded = decoder.decode(ByteBuffer.wrap(a, start, limit));
 
 Review comment:
   Thanks for catching. I added a unit test for raw bytes and fixed to keep a buffer of remainders.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391071339
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging");
-    // This limits the number of bytes which we buffer in case we don't see a newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
       }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
 
-      baos.write(b);
-      // Check to see if the next byte matches further into new line string.
-      if (NEW_LINE[matched] == b) {
-        matched += 1;
-        // If we have matched the entire new line, output the contents of the buffer.
-        if (matched == NEW_LINE.length) {
-          output();
-        }
-      } else {
-        // Reset the match
-        matched = 0;
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      // XXX this enforces decoding on boundaries where before it didn't, does that matter?
 
 Review comment:
   It would be an issue for multi-byte wide characters that are split.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] scwhittle commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
scwhittle commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#issuecomment-597381466
 
 
   R: @lukecwik 
   There are some questions in the change around desired behavior printing char arrays and if we should attempt to detect newlines in strings/arrays passed to print.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r400507824
 
 

 ##########
 File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
 ##########
 @@ -115,6 +116,22 @@ public void testLogOnClose() {
     assertThat(handler.getLogs(), hasLogItem("blah"));
   }
 
+  @Test
+  public void testLogRawBytes() {
+    PrintStream printStream = createPrintStreamAdapter();
+    String msg = "♠ ♡ ♢ ♣ ♤ ♥ ♦ ♧";
+    byte[] bytes = msg.getBytes(UTF_8);
 
 Review comment:
   You have to use the default charset for getBytes since you use the default charset in the CharsetDecoder.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#issuecomment-606248199
 
 
   Run Spotless PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services