You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/04/09 02:11:16 UTC
[beam] branch master updated: Ensure that empty messages are not
flushed to handler.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 84d5402 Ensure that empty messages are not flushed to handler.
new 9280e68 Merge pull request #11351 from scwhittle/fix_log
84d5402 is described below
commit 84d5402b640a19dd32efc6174da6f41d7e320be5
Author: Sam Whittle <sa...@google.com>
AuthorDate: Wed Apr 8 12:18:29 2020 -0700
Ensure that empty messages are not flushed to handler.
---
.../JulHandlerPrintStreamAdapterFactory.java | 44 +++++++++++-----------
.../JulHandlerPrintStreamAdapterFactoryTest.java | 21 +++++++++++
2 files changed, 43 insertions(+), 22 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
index 1eb935a..e4de8cb9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
@@ -91,13 +91,16 @@ class JulHandlerPrintStreamAdapterFactory {
@Override
public void flush() {
- publish(flushToString());
+ publishIfNonEmpty(flushToString());
}
private synchronized String flushToString() {
if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
buffer.setLength(buffer.length() - 1);
}
+ if (buffer.length() == 0) {
+ return null;
+ }
String result = buffer.toString();
buffer.setLength(0);
return result;
@@ -163,9 +166,7 @@ class JulHandlerPrintStreamAdapterFactory {
}
}
}
- if (msg != null) {
- publish(msg);
- }
+ publishIfNonEmpty(msg);
}
@Override
@@ -214,7 +215,7 @@ class JulHandlerPrintStreamAdapterFactory {
}
msg = flushToString();
}
- publish(msg);
+ publishIfNonEmpty(msg);
}
@Override
@@ -228,7 +229,7 @@ class JulHandlerPrintStreamAdapterFactory {
}
msg = flushToString();
}
- publish(msg);
+ publishIfNonEmpty(msg);
}
@Override
@@ -248,7 +249,7 @@ class JulHandlerPrintStreamAdapterFactory {
buffer.append(b);
msg = flushToString();
}
- publish(msg);
+ publishIfNonEmpty(msg);
}
@Override
@@ -258,7 +259,7 @@ class JulHandlerPrintStreamAdapterFactory {
buffer.append(c);
msg = flushToString();
}
- publish(msg);
+ publishIfNonEmpty(msg);
}
@Override
@@ -268,7 +269,7 @@ class JulHandlerPrintStreamAdapterFactory {
buffer.append(i);
msg = flushToString();
}
- publish(msg);
+ publishIfNonEmpty(msg);
}
@Override
@@ -278,7 +279,7 @@ class JulHandlerPrintStreamAdapterFactory {
buffer.append(l);
msg = flushToString();
}
- publish(msg);
+ publishIfNonEmpty(msg);
}
@Override
@@ -288,7 +289,7 @@ class JulHandlerPrintStreamAdapterFactory {
buffer.append(f);
msg = flushToString();
}
- publish(msg);
+ publishIfNonEmpty(msg);
}
@Override
@@ -298,7 +299,7 @@ class JulHandlerPrintStreamAdapterFactory {
buffer.append(d);
msg = flushToString();
}
- publish(msg);
+ publishIfNonEmpty(msg);
}
@Override
@@ -308,7 +309,7 @@ class JulHandlerPrintStreamAdapterFactory {
buffer.append(a);
msg = flushToString();
}
- publish(msg);
+ publishIfNonEmpty(msg);
}
@Override
@@ -318,7 +319,7 @@ class JulHandlerPrintStreamAdapterFactory {
buffer.append(s);
msg = flushToString();
}
- publish(msg);
+ publishIfNonEmpty(msg);
}
@Override
@@ -328,7 +329,7 @@ class JulHandlerPrintStreamAdapterFactory {
buffer.append(o);
msg = flushToString();
}
- publish(msg);
+ publishIfNonEmpty(msg);
}
@Override
@@ -348,7 +349,7 @@ class JulHandlerPrintStreamAdapterFactory {
}
msg = flushToString();
}
- publish(msg);
+ publishIfNonEmpty(msg);
return this;
}
@@ -370,15 +371,18 @@ class JulHandlerPrintStreamAdapterFactory {
}
msg = flushToString();
}
- publish(msg);
+ publishIfNonEmpty(msg);
return this;
}
// Note to avoid a deadlock, publish may never be called synchronized. See BEAM-9399.
- private void publish(Level messageLevel, String message) {
+ private void publishIfNonEmpty(String message) {
checkState(
!Thread.holdsLock(this),
"BEAM-9399: publish should not be called with the lock as it may cause deadlock");
+ if (message == null || message.isEmpty()) {
+ return;
+ }
if (logger.isLoggable(messageLevel)) {
if (outputWarning.compareAndSet(false, true)) {
LogRecord log = new LogRecord(Level.WARNING, LOGGING_DISCLAIMER);
@@ -390,10 +394,6 @@ class JulHandlerPrintStreamAdapterFactory {
handler.publish(log);
}
}
-
- private void publish(String message) {
- publish(messageLevel, message);
- }
}
/**
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
index a93c26b..ae0396f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker.logging;
import static org.apache.beam.runners.dataflow.worker.LogRecordMatcher.hasLogItem;
+import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
@@ -132,6 +133,26 @@ public class JulHandlerPrintStreamAdapterFactoryTest {
assertThat(handler.getLogs(), hasLogItem(msg + newlineMsg));
}
+ @Test
+ public void testNoEmptyMessages() {
+ try (PrintStream printStream = createPrintStreamAdapter()) {
+ printStream.println("blah");
+ printStream.print("\n");
+ printStream.flush();
+ printStream.println("");
+ printStream.flush();
+ printStream.print("");
+ printStream.flush();
+ byte[] bytes = "a".getBytes(Charset.defaultCharset());
+ printStream.write(bytes, 0, 0);
+ printStream.flush();
+ }
+
+ for (LogRecord log : handler.getLogs()) {
+ assertThat(log.getMessage(), not(blankOrNullString()));
+ }
+ }
+
private PrintStream createPrintStreamAdapter() {
return JulHandlerPrintStreamAdapterFactory.create(handler, LOGGER_NAME, Level.INFO);
}