You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/04/09 02:11:16 UTC

[beam] branch master updated: Ensure that empty messages are not flushed to handler.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 84d5402  Ensure that empty messages are not flushed to handler.
     new 9280e68  Merge pull request #11351 from scwhittle/fix_log
84d5402 is described below

commit 84d5402b640a19dd32efc6174da6f41d7e320be5
Author: Sam Whittle <sa...@google.com>
AuthorDate: Wed Apr 8 12:18:29 2020 -0700

    Ensure that empty messages are not flushed to handler.
---
 .../JulHandlerPrintStreamAdapterFactory.java       | 44 +++++++++++-----------
 .../JulHandlerPrintStreamAdapterFactoryTest.java   | 21 +++++++++++
 2 files changed, 43 insertions(+), 22 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
index 1eb935a..e4de8cb9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
@@ -91,13 +91,16 @@ class JulHandlerPrintStreamAdapterFactory {
 
     @Override
     public void flush() {
-      publish(flushToString());
+      publishIfNonEmpty(flushToString());
     }
 
     private synchronized String flushToString() {
       if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
         buffer.setLength(buffer.length() - 1);
       }
+      if (buffer.length() == 0) {
+        return null;
+      }
       String result = buffer.toString();
       buffer.setLength(0);
       return result;
@@ -163,9 +166,7 @@ class JulHandlerPrintStreamAdapterFactory {
           }
         }
       }
-      if (msg != null) {
-        publish(msg);
-      }
+      publishIfNonEmpty(msg);
     }
 
     @Override
@@ -214,7 +215,7 @@ class JulHandlerPrintStreamAdapterFactory {
         }
         msg = flushToString();
       }
-      publish(msg);
+      publishIfNonEmpty(msg);
     }
 
     @Override
@@ -228,7 +229,7 @@ class JulHandlerPrintStreamAdapterFactory {
         }
         msg = flushToString();
       }
-      publish(msg);
+      publishIfNonEmpty(msg);
     }
 
     @Override
@@ -248,7 +249,7 @@ class JulHandlerPrintStreamAdapterFactory {
         buffer.append(b);
         msg = flushToString();
       }
-      publish(msg);
+      publishIfNonEmpty(msg);
     }
 
     @Override
@@ -258,7 +259,7 @@ class JulHandlerPrintStreamAdapterFactory {
         buffer.append(c);
         msg = flushToString();
       }
-      publish(msg);
+      publishIfNonEmpty(msg);
     }
 
     @Override
@@ -268,7 +269,7 @@ class JulHandlerPrintStreamAdapterFactory {
         buffer.append(i);
         msg = flushToString();
       }
-      publish(msg);
+      publishIfNonEmpty(msg);
     }
 
     @Override
@@ -278,7 +279,7 @@ class JulHandlerPrintStreamAdapterFactory {
         buffer.append(l);
         msg = flushToString();
       }
-      publish(msg);
+      publishIfNonEmpty(msg);
     }
 
     @Override
@@ -288,7 +289,7 @@ class JulHandlerPrintStreamAdapterFactory {
         buffer.append(f);
         msg = flushToString();
       }
-      publish(msg);
+      publishIfNonEmpty(msg);
     }
 
     @Override
@@ -298,7 +299,7 @@ class JulHandlerPrintStreamAdapterFactory {
         buffer.append(d);
         msg = flushToString();
       }
-      publish(msg);
+      publishIfNonEmpty(msg);
     }
 
     @Override
@@ -308,7 +309,7 @@ class JulHandlerPrintStreamAdapterFactory {
         buffer.append(a);
         msg = flushToString();
       }
-      publish(msg);
+      publishIfNonEmpty(msg);
     }
 
     @Override
@@ -318,7 +319,7 @@ class JulHandlerPrintStreamAdapterFactory {
         buffer.append(s);
         msg = flushToString();
       }
-      publish(msg);
+      publishIfNonEmpty(msg);
     }
 
     @Override
@@ -328,7 +329,7 @@ class JulHandlerPrintStreamAdapterFactory {
         buffer.append(o);
         msg = flushToString();
       }
-      publish(msg);
+      publishIfNonEmpty(msg);
     }
 
     @Override
@@ -348,7 +349,7 @@ class JulHandlerPrintStreamAdapterFactory {
         }
         msg = flushToString();
       }
-      publish(msg);
+      publishIfNonEmpty(msg);
       return this;
     }
 
@@ -370,15 +371,18 @@ class JulHandlerPrintStreamAdapterFactory {
         }
         msg = flushToString();
       }
-      publish(msg);
+      publishIfNonEmpty(msg);
       return this;
     }
 
     // Note to avoid a deadlock, publish may never be called synchronized. See BEAM-9399.
-    private void publish(Level messageLevel, String message) {
+    private void publishIfNonEmpty(String message) {
       checkState(
           !Thread.holdsLock(this),
           "BEAM-9399: publish should not be called with the lock as it may cause deadlock");
+      if (message == null || message.isEmpty()) {
+        return;
+      }
       if (logger.isLoggable(messageLevel)) {
         if (outputWarning.compareAndSet(false, true)) {
           LogRecord log = new LogRecord(Level.WARNING, LOGGING_DISCLAIMER);
@@ -390,10 +394,6 @@ class JulHandlerPrintStreamAdapterFactory {
         handler.publish(log);
       }
     }
-
-    private void publish(String message) {
-      publish(messageLevel, message);
-    }
   }
 
   /**
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
index a93c26b..ae0396f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow.worker.logging;
 
 import static org.apache.beam.runners.dataflow.worker.LogRecordMatcher.hasLogItem;
+import static org.hamcrest.Matchers.blankOrNullString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
@@ -132,6 +133,26 @@ public class JulHandlerPrintStreamAdapterFactoryTest {
     assertThat(handler.getLogs(), hasLogItem(msg + newlineMsg));
   }
 
+  @Test
+  public void testNoEmptyMessages() {
+    try (PrintStream printStream = createPrintStreamAdapter()) {
+      printStream.println("blah");
+      printStream.print("\n");
+      printStream.flush();
+      printStream.println("");
+      printStream.flush();
+      printStream.print("");
+      printStream.flush();
+      byte[] bytes = "a".getBytes(Charset.defaultCharset());
+      printStream.write(bytes, 0, 0);
+      printStream.flush();
+    }
+
+    for (LogRecord log : handler.getLogs()) {
+      assertThat(log.getMessage(), not(blankOrNullString()));
+    }
+  }
+
   private PrintStream createPrintStreamAdapter() {
     return JulHandlerPrintStreamAdapterFactory.create(handler, LOGGER_NAME, Level.INFO);
   }