You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/06/04 00:25:40 UTC

[beam] branch master updated: [BEAM-14556] Honor the formatter installed on the root handler. (#17820)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dce7b8857f [BEAM-14556] Honor the formatter installed on the root handler. (#17820)
4dce7b8857f is described below

commit 4dce7b8857f37608321253073745fe7611a48af9
Author: Luke Cwik <lc...@google.com>
AuthorDate: Fri Jun 3 17:25:32 2022 -0700

    [BEAM-14556] Honor the formatter installed on the root handler. (#17820)
    
    This allows for an MDC to be integrated and/or other options.
---
 .../logging/DataflowWorkerLoggingHandler.java      |  5 ++-
 .../logging/DataflowWorkerLoggingHandlerTest.java  | 37 ++++++++++++++++++++++
 .../fn/harness/logging/BeamFnLoggingClient.java    |  5 +--
 .../harness/logging/BeamFnLoggingClientTest.java   | 30 +++++++++++++++++-
 4 files changed, 71 insertions(+), 6 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
index f84ce2eab2d..b87bae31d19 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
@@ -35,7 +35,6 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.EnumMap;
 import java.util.logging.ErrorManager;
-import java.util.logging.Formatter;
 import java.util.logging.Handler;
 import java.util.logging.LogRecord;
 import java.util.logging.SimpleFormatter;
@@ -103,6 +102,7 @@ public class DataflowWorkerLoggingHandler extends Handler {
    * or negative.
    */
   DataflowWorkerLoggingHandler(Supplier<OutputStream> factory, long sizeLimit) throws IOException {
+    this.setFormatter(new SimpleFormatter());
     this.outputStreamFactory = factory;
     this.generatorFactory = new ObjectMapper().getFactory();
     this.sizeLimit = sizeLimit < 1 ? Long.MAX_VALUE : sizeLimit;
@@ -142,7 +142,7 @@ public class DataflowWorkerLoggingHandler extends Handler {
           "severity",
           MoreObjects.firstNonNull(LEVELS.get(record.getLevel()), record.getLevel().getName()));
       // Write the other labels.
-      writeIfNotEmpty("message", formatter.formatMessage(record));
+      writeIfNotEmpty("message", getFormatter().formatMessage(record));
       writeIfNotEmpty("thread", String.valueOf(record.getThreadID()));
       writeIfNotEmpty("job", DataflowWorkerLoggingMDC.getJobId());
       writeIfNotEmpty("stage", DataflowWorkerLoggingMDC.getStageName());
@@ -344,5 +344,4 @@ public class DataflowWorkerLoggingHandler extends Handler {
   private final long sizeLimit;
   private final Supplier<OutputStream> outputStreamFactory;
   private final JsonFactory generatorFactory;
-  private final Formatter formatter = new SimpleFormatter();
 }
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
index ac2c5c9e63e..0f6d85dd3fe 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
@@ -27,8 +27,10 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.logging.Formatter;
 import java.util.logging.Level;
 import java.util.logging.LogRecord;
+import java.util.logging.SimpleFormatter;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
@@ -44,6 +46,7 @@ import org.junit.Test;
 import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.slf4j.MDC;
 
 /** Unit tests for {@link DataflowWorkerLoggingHandler}. */
 @RunWith(JUnit4.class)
@@ -94,6 +97,17 @@ public class DataflowWorkerLoggingHandlerTest {
     return new String(output.toByteArray(), StandardCharsets.UTF_8);
   }
 
+  private static String createJson(LogRecord record, Formatter formatter) throws IOException {
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    FixedOutputStreamFactory factory = new FixedOutputStreamFactory(output);
+    DataflowWorkerLoggingHandler handler = new DataflowWorkerLoggingHandler(factory, 0);
+    handler.setFormatter(formatter);
+    // Format the record as JSON.
+    handler.publish(record);
+    // Decode the binary output as UTF-8 and return the generated string.
+    return new String(output.toByteArray(), StandardCharsets.UTF_8);
+  }
+
   /**
    * Encodes a {@link org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry} into a Json string.
    */
@@ -206,6 +220,29 @@ public class DataflowWorkerLoggingHandlerTest {
         createJson(createLogRecord("test.message", null /* throwable */)));
   }
 
+  @Test
+  public void testWithMessageUsingCustomFormatter() throws IOException {
+    DataflowWorkerLoggingMDC.setJobId("testJobId");
+    DataflowWorkerLoggingMDC.setWorkerId("testWorkerId");
+    DataflowWorkerLoggingMDC.setWorkId("testWorkId");
+
+    Formatter customFormatter =
+        new SimpleFormatter() {
+          @Override
+          public synchronized String formatMessage(LogRecord record) {
+            return MDC.get("testMdcKey") + ":" + super.formatMessage(record);
+          }
+        };
+    MDC.put("testMdcKey", "testMdcValue");
+
+    assertEquals(
+        "{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
+            + "\"message\":\"testMdcValue:test.message\",\"thread\":\"2\",\"job\":\"testJobId\","
+            + "\"worker\":\"testWorkerId\",\"work\":\"testWorkId\",\"logger\":\"LoggerName\"}"
+            + System.lineSeparator(),
+        createJson(createLogRecord("test.message", null /* throwable */), customFormatter));
+  }
+
   @Test
   public void testWithMessageRequiringJulFormatting() throws IOException {
     assertEquals(
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index be9a1b3c419..f7650334a64 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -86,7 +86,7 @@ public class BeamFnLoggingClient implements AutoCloseable {
           .put(SdkHarnessOptions.LogLevel.TRACE, Level.FINEST)
           .build();
 
-  private static final Formatter FORMATTER = new SimpleFormatter();
+  private static final Formatter DEFAULT_FORMATTER = new SimpleFormatter();
 
   /**
    * The number of log messages that will be buffered. Assuming log messages are at most 1 KiB, this
@@ -145,6 +145,7 @@ public class BeamFnLoggingClient implements AutoCloseable {
     inboundObserver = new LogControlObserver();
     logRecordHandler = new LogRecordHandler();
     logRecordHandler.setLevel(Level.ALL);
+    logRecordHandler.setFormatter(DEFAULT_FORMATTER);
     logRecordHandler.executeOn(options.as(GcsOptions.class).getExecutorService());
     outboundObserver = (CallStreamObserver<BeamFnApi.LogEntry.List>) stub.logging(inboundObserver);
     rootLogger.addHandler(logRecordHandler);
@@ -205,7 +206,7 @@ public class BeamFnLoggingClient implements AutoCloseable {
       BeamFnApi.LogEntry.Builder builder =
           BeamFnApi.LogEntry.newBuilder()
               .setSeverity(severity)
-              .setMessage(FORMATTER.formatMessage(record))
+              .setMessage(getFormatter().formatMessage(record))
               .setThread(Integer.toString(record.getThreadID()))
               .setTimestamp(
                   Timestamp.newBuilder()
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
index ede0739c34b..109eb7980d9 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -30,10 +30,12 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Handler;
 import java.util.logging.Level;
 import java.util.logging.LogManager;
 import java.util.logging.LogRecord;
 import java.util.logging.Logger;
+import java.util.logging.SimpleFormatter;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
@@ -53,6 +55,7 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.slf4j.MDC;
 
 /** Tests for {@link BeamFnLoggingClient}. */
 @RunWith(JUnit4.class)
@@ -86,6 +89,15 @@ public class BeamFnLoggingClientTest {
           .setTimestamp(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build())
           .setLogLocation("LoggerName")
           .build();
+  private static final BeamFnApi.LogEntry TEST_ENTRY_WITH_CUSTOM_FORMATTER =
+      BeamFnApi.LogEntry.newBuilder()
+          .setInstructionId("instruction-1")
+          .setSeverity(BeamFnApi.LogEntry.Severity.Enum.DEBUG)
+          .setMessage("testMdcValue:Message")
+          .setThread("12345")
+          .setTimestamp(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build())
+          .setLogLocation("LoggerName")
+          .build();
   private static final BeamFnApi.LogEntry TEST_ENTRY_WITH_EXCEPTION =
       BeamFnApi.LogEntry.newBuilder()
           .setInstructionId("instruction-1")
@@ -163,6 +175,20 @@ public class BeamFnLoggingClientTest {
       // Should not be filtered because the default log level override for ConfiguredLogger is DEBUG
       configuredLogger.log(TEST_RECORD);
       configuredLogger.log(TEST_RECORD_WITH_EXCEPTION);
+
+      // Ensure that configuring a custom formatter on the logging handler will be honored.
+      for (Handler handler : rootLogger.getHandlers()) {
+        handler.setFormatter(
+            new SimpleFormatter() {
+              @Override
+              public synchronized String formatMessage(LogRecord record) {
+                return MDC.get("testMdcKey") + ":" + super.formatMessage(record);
+              }
+            });
+      }
+      MDC.put("testMdcKey", "testMdcValue");
+      configuredLogger.log(TEST_RECORD);
+
       client.close();
 
       // Verify that after close, log levels are reset.
@@ -171,7 +197,9 @@ public class BeamFnLoggingClientTest {
 
       assertTrue(clientClosedStream.get());
       assertTrue(channel.isShutdown());
-      assertThat(values, contains(TEST_ENTRY, TEST_ENTRY_WITH_EXCEPTION));
+      assertThat(
+          values,
+          contains(TEST_ENTRY, TEST_ENTRY_WITH_EXCEPTION, TEST_ENTRY_WITH_CUSTOM_FORMATTER));
     } finally {
       server.shutdownNow();
     }