You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mm...@apache.org on 2022/08/10 09:01:56 UTC
[beam] branch master updated: Adhoc: Fix logging in Spark runner to avoid unnecessary creation of strings (#22638)
This is an automated email from the ASF dual-hosted git repository.
mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 00b5605852c Adhoc: Fix logging in Spark runner to avoid unnecessary creation of strings (#22638)
00b5605852c is described below
commit 00b5605852c8df92a220b04079d5ef7c7cef7a38
Author: Moritz Mack <mm...@talend.com>
AuthorDate: Wed Aug 10 11:01:50 2022 +0200
Adhoc: Fix logging in Spark runner to avoid unnecessary creation of strings (#22638)
---
.../aggregators/AggregatorsAccumulator.java | 2 +-
.../metrics/MetricsAccumulator.java | 2 +-
.../aggregators/AggregatorsAccumulator.java | 2 +-
.../metrics/MetricsAccumulator.java | 2 +-
.../beam/runners/spark/SparkPipelineRunner.java | 14 +++++-------
.../org/apache/beam/runners/spark/SparkRunner.java | 2 +-
.../beam/runners/spark/SparkRunnerDebugger.java | 2 +-
.../apache/beam/runners/spark/TestSparkRunner.java | 2 +-
.../spark/aggregators/AggregatorsAccumulator.java | 2 +-
.../beam/runners/spark/io/MicrobatchSource.java | 7 +++---
.../beam/runners/spark/io/SourceDStream.java | 2 +-
.../runners/spark/metrics/MetricsAccumulator.java | 2 +-
.../SparkGroupAlsoByWindowViaWindowSet.java | 25 +++++++++++-----------
.../streaming/WatermarkSyncedDStream.java | 6 ++----
14 files changed, 32 insertions(+), 40 deletions(-)
diff --git a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
index da70a0d56c3..bdba5509a9b 100644
--- a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
+++ b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
@@ -52,7 +52,7 @@ public class AggregatorsAccumulator {
instance = accumulator;
}
}
- LOG.info("Instantiated aggregators accumulator: " + instance.value());
+ LOG.info("Instantiated aggregators accumulator: {}", instance.value());
}
}
diff --git a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
index e299aba92ff..b319ae36487 100644
--- a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
+++ b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
@@ -53,7 +53,7 @@ public class MetricsAccumulator {
instance = accumulator;
}
}
- LOG.info("Instantiated metrics accumulator: " + instance.value());
+ LOG.info("Instantiated metrics accumulator: {}", instance.value());
} else {
instance.reset();
}
diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
index da70a0d56c3..bdba5509a9b 100644
--- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
+++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
@@ -52,7 +52,7 @@ public class AggregatorsAccumulator {
instance = accumulator;
}
}
- LOG.info("Instantiated aggregators accumulator: " + instance.value());
+ LOG.info("Instantiated aggregators accumulator: {}", instance.value());
}
}
diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
index e299aba92ff..b319ae36487 100644
--- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
+++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
@@ -53,7 +53,7 @@ public class MetricsAccumulator {
instance = accumulator;
}
}
- LOG.info("Instantiated metrics accumulator: " + instance.value());
+ LOG.info("Instantiated metrics accumulator: {}", instance.value());
} else {
instance.reset();
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index b7495548e4d..b88ec308e15 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -121,7 +121,7 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
translator.createTranslationContext(jsc, pipelineOptions, jobInfo);
final ExecutorService executorService = Executors.newSingleThreadExecutor();
- LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), jsc.master()));
+ LOG.info("Running job {} on Spark master {}", jobInfo.jobId(), jsc.master());
if (isStreaming) {
final JavaStreamingContext jssc =
@@ -157,9 +157,7 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
() -> {
translator.translate(fusedPipeline, context);
LOG.info(
- String.format(
- "Job %s: Pipeline translated successfully. Computing outputs",
- jobInfo.jobId()));
+ "Job {}: Pipeline translated successfully. Computing outputs", jobInfo.jobId());
context.computeOutputs();
jssc.start();
@@ -169,7 +167,7 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
LOG.warn("Streaming context interrupted, shutting down.", e);
}
jssc.stop();
- LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
+ LOG.info("Job {} finished.", jobInfo.jobId());
});
result = new SparkPipelineResult.PortableStreamingMode(submissionFuture, jssc);
} else {
@@ -178,11 +176,9 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
() -> {
translator.translate(fusedPipeline, context);
LOG.info(
- String.format(
- "Job %s: Pipeline translated successfully. Computing outputs",
- jobInfo.jobId()));
+ "Job {}: Pipeline translated successfully. Computing outputs", jobInfo.jobId());
context.computeOutputs();
- LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
+ LOG.info("Job {} finished.", jobInfo.jobId());
});
result = new SparkPipelineResult.PortableBatchMode(submissionFuture, jsc);
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index ceca00edbd8..2fe92bb29f1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -188,7 +188,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
// register user-defined listeners.
for (JavaStreamingListener listener :
pipelineOptions.as(SparkContextOptions.class).getListeners()) {
- LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
+ LOG.info("Registered listener {}.", listener.getClass().getSimpleName());
jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
index 5f1da96e29d..31e3e34ee0d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -113,7 +113,7 @@ public final class SparkRunnerDebugger extends PipelineRunner<SparkPipelineResul
SparkContextFactory.stopSparkContext(jsc);
String debugString = visitor.getDebugString();
- LOG.info("Translated Native Spark pipeline:\n" + debugString);
+ LOG.info("Translated Native Spark pipeline:\n{}", debugString);
return new DebugSparkPipelineResult(debugString);
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index abcebbc23c5..646781574c5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -90,7 +90,7 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
MetricsAccumulator.clear();
GlobalWatermarkHolder.clear();
- LOG.info("About to run test pipeline " + options.getJobName());
+ LOG.info("About to run test pipeline {}", options.getJobName());
// if the pipeline was executed in streaming mode, validate aggregators.
if (isForceStreaming) {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
index 96f9098c351..007ca9cdc12 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
@@ -75,7 +75,7 @@ public class AggregatorsAccumulator {
instance = accumulator;
}
}
- LOG.info("Instantiated aggregators accumulator: " + instance.value());
+ LOG.info("Instantiated aggregators accumulator: {}", instance.value());
}
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 3a3fd1c8c4f..9da520f6b9c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -309,10 +309,9 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
@Override
public Reader call() throws Exception {
LOG.info(
- "No cached reader found for split: ["
- + source
- + "]. Creating new reader at checkpoint mark "
- + checkpointMark);
+ "No cached reader found for split: [{}]. Creating new reader at checkpoint mark {}",
+ source,
+ checkpointMark);
return new Reader(source.createReader(options, checkpointMark));
}
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index 0bd9143b50b..4b20f4e8713 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -183,7 +183,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
proportionalDuration.isLongerThan(lowerBoundDuration)
? proportionalDuration
: lowerBoundDuration;
- LOG.info("Read duration set to: " + readDuration);
+ LOG.info("Read duration set to: {}", readDuration);
return readDuration;
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
index 3eb3cab55c9..c0a43f35693 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
@@ -76,7 +76,7 @@ public class MetricsAccumulator {
instance = accumulator;
}
}
- LOG.info("Instantiated metrics accumulator: " + instance.value());
+ LOG.info("Instantiated metrics accumulator: {}", instance.value());
} else {
instance.reset();
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 05187509269..81bd254678e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -279,7 +279,7 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
FluentIterable.from(JavaConversions.asJavaIterable(encodedElements))
.transform(bytes -> CoderHelpers.fromByteArray(bytes, wvCoder));
- LOG.trace(logPrefix + ": input elements: {}", elements);
+ LOG.trace("{}: input elements: {}", logPrefix, elements);
// Incoming expired windows are filtered based on
// timerInternals.currentInputWatermarkTime() and the configured allowed
@@ -294,7 +294,7 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
LateDataUtils.dropExpiredWindows(
key, elements, timerInternals, windowingStrategy, droppedDueToLateness));
- LOG.trace(logPrefix + ": non expired input elements: {}", nonExpiredElements);
+ LOG.trace("{}: non expired input elements: {}", logPrefix, nonExpiredElements);
reduceFnRunner.processElements(nonExpiredElements);
} catch (final Exception e) {
@@ -306,8 +306,7 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
}
try {
// advance the watermark to HWM to fire by timers.
- LOG.debug(
- logPrefix + ": timerInternals before advance are {}", timerInternals.toString());
+ LOG.debug("{}: timerInternals before advance are {}", logPrefix, timerInternals);
// store the highWatermark as the new inputWatermark to calculate triggers
timerInternals.advanceWatermark();
@@ -317,7 +316,9 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
timerInternals.getTimers(), timerInternals.currentInputWatermarkTime());
LOG.debug(
- logPrefix + ": timers eligible for processing are {}", timersEligibleForProcessing);
+ "{}: timers eligible for processing are {}",
+ logPrefix,
+ timersEligibleForProcessing);
// Note that at this point, the watermark has already advanced since
// timerInternals.advanceWatermark() has been called and the highWatermark
@@ -352,12 +353,10 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
SparkTimerInternals.serializeTimers(
timerInternals.getTimers(), timerDataCoder));
- /*
- Not something we want to happen in production, but is very helpful
- when debugging - TRACE.
- */
- LOG.trace(logPrefix + ": output elements are {}", Joiner.on(", ").join(outputs));
-
+ if (LOG.isTraceEnabled()) {
+ // Not something we want to happen in production, but is very helpful when debugging.
+ LOG.trace("{}: output elements are {}", logPrefix, Joiner.on(", ").join(outputs));
+ }
// persist Spark's state by outputting.
final List<byte[]> serOutput = CoderHelpers.toByteArrays(outputs, wvKvIterCoder);
return new Tuple2<>(encodedKey, new Tuple2<>(updated, serOutput));
@@ -440,12 +439,12 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
// log if there's something to log.
final long lateDropped = droppedDueToLateness.getCumulative();
if (lateDropped > 0) {
- LOG.info(String.format("Dropped %d elements due to lateness.", lateDropped));
+ LOG.info("Dropped {} elements due to lateness.", lateDropped);
droppedDueToLateness.inc(-droppedDueToLateness.getCumulative());
}
final long closedWindowDropped = droppedDueToClosedWindow.getCumulative();
if (closedWindowDropped > 0) {
- LOG.info(String.format("Dropped %d elements due to closed window.", closedWindowDropped));
+ LOG.info("Dropped {} elements due to closed window.", closedWindowDropped);
droppedDueToClosedWindow.inc(-droppedDueToClosedWindow.getCumulative());
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java
index 637b04a3f10..24b5a0f867f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java
@@ -114,8 +114,7 @@ class WatermarkSyncedDStream<T> extends InputDStream<WindowedValue<T>> {
final long batchTime = validTime.milliseconds();
LOG.trace(
- "BEFORE waiting for watermark sync, "
- + "LastWatermarkedBatchTime: {}, current batch time: {}",
+ "BEFORE waiting for watermark sync, LastWatermarkedBatchTime: {}, current batch time: {}",
GlobalWatermarkHolder.getLastWatermarkedBatchTime(),
batchTime);
@@ -133,8 +132,7 @@ class WatermarkSyncedDStream<T> extends InputDStream<WindowedValue<T>> {
LOG.info("Watermarks are now: {}", GlobalWatermarkHolder.get(batchDuration));
LOG.trace(
- "AFTER waiting for watermark sync, "
- + "LastWatermarkedBatchTime: {}, current batch time: {}",
+ "AFTER waiting for watermark sync, LastWatermarkedBatchTime: {}, current batch time: {}",
GlobalWatermarkHolder.getLastWatermarkedBatchTime(),
batchTime);