You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/06/04 00:25:40 UTC
[beam] branch master updated: [BEAM-14556] Honor the formatter installed on the root handler. (#17820)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4dce7b8857f [BEAM-14556] Honor the formatter installed on the root handler. (#17820)
4dce7b8857f is described below
commit 4dce7b8857f37608321253073745fe7611a48af9
Author: Luke Cwik <lc...@google.com>
AuthorDate: Fri Jun 3 17:25:32 2022 -0700
[BEAM-14556] Honor the formatter installed on the root handler. (#17820)
This allows for an MDC to be integrated and/or other options.
---
.../logging/DataflowWorkerLoggingHandler.java | 5 ++-
.../logging/DataflowWorkerLoggingHandlerTest.java | 37 ++++++++++++++++++++++
.../fn/harness/logging/BeamFnLoggingClient.java | 5 +--
.../harness/logging/BeamFnLoggingClientTest.java | 30 +++++++++++++++++-
4 files changed, 71 insertions(+), 6 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
index f84ce2eab2d..b87bae31d19 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
@@ -35,7 +35,6 @@ import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.EnumMap;
import java.util.logging.ErrorManager;
-import java.util.logging.Formatter;
import java.util.logging.Handler;
import java.util.logging.LogRecord;
import java.util.logging.SimpleFormatter;
@@ -103,6 +102,7 @@ public class DataflowWorkerLoggingHandler extends Handler {
* or negative.
*/
DataflowWorkerLoggingHandler(Supplier<OutputStream> factory, long sizeLimit) throws IOException {
+ this.setFormatter(new SimpleFormatter());
this.outputStreamFactory = factory;
this.generatorFactory = new ObjectMapper().getFactory();
this.sizeLimit = sizeLimit < 1 ? Long.MAX_VALUE : sizeLimit;
@@ -142,7 +142,7 @@ public class DataflowWorkerLoggingHandler extends Handler {
"severity",
MoreObjects.firstNonNull(LEVELS.get(record.getLevel()), record.getLevel().getName()));
// Write the other labels.
- writeIfNotEmpty("message", formatter.formatMessage(record));
+ writeIfNotEmpty("message", getFormatter().formatMessage(record));
writeIfNotEmpty("thread", String.valueOf(record.getThreadID()));
writeIfNotEmpty("job", DataflowWorkerLoggingMDC.getJobId());
writeIfNotEmpty("stage", DataflowWorkerLoggingMDC.getStageName());
@@ -344,5 +344,4 @@ public class DataflowWorkerLoggingHandler extends Handler {
private final long sizeLimit;
private final Supplier<OutputStream> outputStreamFactory;
private final JsonFactory generatorFactory;
- private final Formatter formatter = new SimpleFormatter();
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
index ac2c5c9e63e..0f6d85dd3fe 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
@@ -27,8 +27,10 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
+import java.util.logging.Formatter;
import java.util.logging.Level;
import java.util.logging.LogRecord;
+import java.util.logging.SimpleFormatter;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
@@ -44,6 +46,7 @@ import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.slf4j.MDC;
/** Unit tests for {@link DataflowWorkerLoggingHandler}. */
@RunWith(JUnit4.class)
@@ -94,6 +97,17 @@ public class DataflowWorkerLoggingHandlerTest {
return new String(output.toByteArray(), StandardCharsets.UTF_8);
}
+ private static String createJson(LogRecord record, Formatter formatter) throws IOException {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ FixedOutputStreamFactory factory = new FixedOutputStreamFactory(output);
+ DataflowWorkerLoggingHandler handler = new DataflowWorkerLoggingHandler(factory, 0);
+ handler.setFormatter(formatter);
+ // Format the record as JSON.
+ handler.publish(record);
+ // Decode the binary output as UTF-8 and return the generated string.
+ return new String(output.toByteArray(), StandardCharsets.UTF_8);
+ }
+
/**
* Encodes a {@link org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry} into a Json string.
*/
@@ -206,6 +220,29 @@ public class DataflowWorkerLoggingHandlerTest {
createJson(createLogRecord("test.message", null /* throwable */)));
}
+ @Test
+ public void testWithMessageUsingCustomFormatter() throws IOException {
+ DataflowWorkerLoggingMDC.setJobId("testJobId");
+ DataflowWorkerLoggingMDC.setWorkerId("testWorkerId");
+ DataflowWorkerLoggingMDC.setWorkId("testWorkId");
+
+ Formatter customFormatter =
+ new SimpleFormatter() {
+ @Override
+ public synchronized String formatMessage(LogRecord record) {
+ return MDC.get("testMdcKey") + ":" + super.formatMessage(record);
+ }
+ };
+ MDC.put("testMdcKey", "testMdcValue");
+
+ assertEquals(
+ "{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
+ + "\"message\":\"testMdcValue:test.message\",\"thread\":\"2\",\"job\":\"testJobId\","
+ + "\"worker\":\"testWorkerId\",\"work\":\"testWorkId\",\"logger\":\"LoggerName\"}"
+ + System.lineSeparator(),
+ createJson(createLogRecord("test.message", null /* throwable */), customFormatter));
+ }
+
@Test
public void testWithMessageRequiringJulFormatting() throws IOException {
assertEquals(
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index be9a1b3c419..f7650334a64 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -86,7 +86,7 @@ public class BeamFnLoggingClient implements AutoCloseable {
.put(SdkHarnessOptions.LogLevel.TRACE, Level.FINEST)
.build();
- private static final Formatter FORMATTER = new SimpleFormatter();
+ private static final Formatter DEFAULT_FORMATTER = new SimpleFormatter();
/**
* The number of log messages that will be buffered. Assuming log messages are at most 1 KiB, this
@@ -145,6 +145,7 @@ public class BeamFnLoggingClient implements AutoCloseable {
inboundObserver = new LogControlObserver();
logRecordHandler = new LogRecordHandler();
logRecordHandler.setLevel(Level.ALL);
+ logRecordHandler.setFormatter(DEFAULT_FORMATTER);
logRecordHandler.executeOn(options.as(GcsOptions.class).getExecutorService());
outboundObserver = (CallStreamObserver<BeamFnApi.LogEntry.List>) stub.logging(inboundObserver);
rootLogger.addHandler(logRecordHandler);
@@ -205,7 +206,7 @@ public class BeamFnLoggingClient implements AutoCloseable {
BeamFnApi.LogEntry.Builder builder =
BeamFnApi.LogEntry.newBuilder()
.setSeverity(severity)
- .setMessage(FORMATTER.formatMessage(record))
+ .setMessage(getFormatter().formatMessage(record))
.setThread(Integer.toString(record.getThreadID()))
.setTimestamp(
Timestamp.newBuilder()
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
index ede0739c34b..109eb7980d9 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -30,10 +30,12 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
+import java.util.logging.SimpleFormatter;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
@@ -53,6 +55,7 @@ import org.junit.rules.ExpectedException;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.slf4j.MDC;
/** Tests for {@link BeamFnLoggingClient}. */
@RunWith(JUnit4.class)
@@ -86,6 +89,15 @@ public class BeamFnLoggingClientTest {
.setTimestamp(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build())
.setLogLocation("LoggerName")
.build();
+ private static final BeamFnApi.LogEntry TEST_ENTRY_WITH_CUSTOM_FORMATTER =
+ BeamFnApi.LogEntry.newBuilder()
+ .setInstructionId("instruction-1")
+ .setSeverity(BeamFnApi.LogEntry.Severity.Enum.DEBUG)
+ .setMessage("testMdcValue:Message")
+ .setThread("12345")
+ .setTimestamp(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build())
+ .setLogLocation("LoggerName")
+ .build();
private static final BeamFnApi.LogEntry TEST_ENTRY_WITH_EXCEPTION =
BeamFnApi.LogEntry.newBuilder()
.setInstructionId("instruction-1")
@@ -163,6 +175,20 @@ public class BeamFnLoggingClientTest {
// Should not be filtered because the default log level override for ConfiguredLogger is DEBUG
configuredLogger.log(TEST_RECORD);
configuredLogger.log(TEST_RECORD_WITH_EXCEPTION);
+
+ // Ensure that configuring a custom formatter on the logging handler will be honored.
+ for (Handler handler : rootLogger.getHandlers()) {
+ handler.setFormatter(
+ new SimpleFormatter() {
+ @Override
+ public synchronized String formatMessage(LogRecord record) {
+ return MDC.get("testMdcKey") + ":" + super.formatMessage(record);
+ }
+ });
+ }
+ MDC.put("testMdcKey", "testMdcValue");
+ configuredLogger.log(TEST_RECORD);
+
client.close();
// Verify that after close, log levels are reset.
@@ -171,7 +197,9 @@ public class BeamFnLoggingClientTest {
assertTrue(clientClosedStream.get());
assertTrue(channel.isShutdown());
- assertThat(values, contains(TEST_ENTRY, TEST_ENTRY_WITH_EXCEPTION));
+ assertThat(
+ values,
+ contains(TEST_ENTRY, TEST_ENTRY_WITH_EXCEPTION, TEST_ENTRY_WITH_CUSTOM_FORMATTER));
} finally {
server.shutdownNow();
}