You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mm...@apache.org on 2022/08/10 09:01:56 UTC

[beam] branch master updated: Adhoc: Fix logging in Spark runner to avoid unnecessary creation of strings (#22638)

This is an automated email from the ASF dual-hosted git repository.

mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 00b5605852c Adhoc: Fix logging in Spark runner to avoid unnecessary creation of strings (#22638)
00b5605852c is described below

commit 00b5605852c8df92a220b04079d5ef7c7cef7a38
Author: Moritz Mack <mm...@talend.com>
AuthorDate: Wed Aug 10 11:01:50 2022 +0200

    Adhoc: Fix logging in Spark runner to avoid unnecessary creation of strings (#22638)
---
 .../aggregators/AggregatorsAccumulator.java        |  2 +-
 .../metrics/MetricsAccumulator.java                |  2 +-
 .../aggregators/AggregatorsAccumulator.java        |  2 +-
 .../metrics/MetricsAccumulator.java                |  2 +-
 .../beam/runners/spark/SparkPipelineRunner.java    | 14 +++++-------
 .../org/apache/beam/runners/spark/SparkRunner.java |  2 +-
 .../beam/runners/spark/SparkRunnerDebugger.java    |  2 +-
 .../apache/beam/runners/spark/TestSparkRunner.java |  2 +-
 .../spark/aggregators/AggregatorsAccumulator.java  |  2 +-
 .../beam/runners/spark/io/MicrobatchSource.java    |  7 +++---
 .../beam/runners/spark/io/SourceDStream.java       |  2 +-
 .../runners/spark/metrics/MetricsAccumulator.java  |  2 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java        | 25 +++++++++++-----------
 .../streaming/WatermarkSyncedDStream.java          |  6 ++----
 14 files changed, 32 insertions(+), 40 deletions(-)

diff --git a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
index da70a0d56c3..bdba5509a9b 100644
--- a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
+++ b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
@@ -52,7 +52,7 @@ public class AggregatorsAccumulator {
           instance = accumulator;
         }
       }
-      LOG.info("Instantiated aggregators accumulator: " + instance.value());
+      LOG.info("Instantiated aggregators accumulator: {}", instance.value());
     }
   }
 
diff --git a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
index e299aba92ff..b319ae36487 100644
--- a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
+++ b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
@@ -53,7 +53,7 @@ public class MetricsAccumulator {
           instance = accumulator;
         }
       }
-      LOG.info("Instantiated metrics accumulator: " + instance.value());
+      LOG.info("Instantiated metrics accumulator: {}", instance.value());
     } else {
       instance.reset();
     }
diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
index da70a0d56c3..bdba5509a9b 100644
--- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
+++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
@@ -52,7 +52,7 @@ public class AggregatorsAccumulator {
           instance = accumulator;
         }
       }
-      LOG.info("Instantiated aggregators accumulator: " + instance.value());
+      LOG.info("Instantiated aggregators accumulator: {}", instance.value());
     }
   }
 
diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
index e299aba92ff..b319ae36487 100644
--- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
+++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
@@ -53,7 +53,7 @@ public class MetricsAccumulator {
           instance = accumulator;
         }
       }
-      LOG.info("Instantiated metrics accumulator: " + instance.value());
+      LOG.info("Instantiated metrics accumulator: {}", instance.value());
     } else {
       instance.reset();
     }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index b7495548e4d..b88ec308e15 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -121,7 +121,7 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
         translator.createTranslationContext(jsc, pipelineOptions, jobInfo);
     final ExecutorService executorService = Executors.newSingleThreadExecutor();
 
-    LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), jsc.master()));
+    LOG.info("Running job {} on Spark master {}", jobInfo.jobId(), jsc.master());
 
     if (isStreaming) {
       final JavaStreamingContext jssc =
@@ -157,9 +157,7 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
               () -> {
                 translator.translate(fusedPipeline, context);
                 LOG.info(
-                    String.format(
-                        "Job %s: Pipeline translated successfully. Computing outputs",
-                        jobInfo.jobId()));
+                    "Job {}: Pipeline translated successfully. Computing outputs", jobInfo.jobId());
                 context.computeOutputs();
 
                 jssc.start();
@@ -169,7 +167,7 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
                   LOG.warn("Streaming context interrupted, shutting down.", e);
                 }
                 jssc.stop();
-                LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
+                LOG.info("Job {} finished.", jobInfo.jobId());
               });
       result = new SparkPipelineResult.PortableStreamingMode(submissionFuture, jssc);
     } else {
@@ -178,11 +176,9 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
               () -> {
                 translator.translate(fusedPipeline, context);
                 LOG.info(
-                    String.format(
-                        "Job %s: Pipeline translated successfully. Computing outputs",
-                        jobInfo.jobId()));
+                    "Job {}: Pipeline translated successfully. Computing outputs", jobInfo.jobId());
                 context.computeOutputs();
-                LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
+                LOG.info("Job {} finished.", jobInfo.jobId());
               });
       result = new SparkPipelineResult.PortableBatchMode(submissionFuture, jsc);
     }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index ceca00edbd8..2fe92bb29f1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -188,7 +188,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       // register user-defined listeners.
       for (JavaStreamingListener listener :
           pipelineOptions.as(SparkContextOptions.class).getListeners()) {
-        LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
+        LOG.info("Registered listener {}.", listener.getClass().getSimpleName());
         jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
       }
 
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
index 5f1da96e29d..31e3e34ee0d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -113,7 +113,7 @@ public final class SparkRunnerDebugger extends PipelineRunner<SparkPipelineResul
     SparkContextFactory.stopSparkContext(jsc);
 
     String debugString = visitor.getDebugString();
-    LOG.info("Translated Native Spark pipeline:\n" + debugString);
+    LOG.info("Translated Native Spark pipeline:\n{}", debugString);
     return new DebugSparkPipelineResult(debugString);
   }
 
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index abcebbc23c5..646781574c5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -90,7 +90,7 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
     MetricsAccumulator.clear();
     GlobalWatermarkHolder.clear();
 
-    LOG.info("About to run test pipeline " + options.getJobName());
+    LOG.info("About to run test pipeline {}", options.getJobName());
 
     // if the pipeline was executed in streaming mode, validate aggregators.
     if (isForceStreaming) {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
index 96f9098c351..007ca9cdc12 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
@@ -75,7 +75,7 @@ public class AggregatorsAccumulator {
           instance = accumulator;
         }
       }
-      LOG.info("Instantiated aggregators accumulator: " + instance.value());
+      LOG.info("Instantiated aggregators accumulator: {}", instance.value());
     }
   }
 
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 3a3fd1c8c4f..9da520f6b9c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -309,10 +309,9 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
     @Override
     public Reader call() throws Exception {
       LOG.info(
-          "No cached reader found for split: ["
-              + source
-              + "]. Creating new reader at checkpoint mark "
-              + checkpointMark);
+          "No cached reader found for split: [{}]. Creating new reader at checkpoint mark {}",
+          source,
+          checkpointMark);
       return new Reader(source.createReader(options, checkpointMark));
     }
   }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index 0bd9143b50b..4b20f4e8713 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -183,7 +183,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
         proportionalDuration.isLongerThan(lowerBoundDuration)
             ? proportionalDuration
             : lowerBoundDuration;
-    LOG.info("Read duration set to: " + readDuration);
+    LOG.info("Read duration set to: {}", readDuration);
     return readDuration;
   }
 
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
index 3eb3cab55c9..c0a43f35693 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
@@ -76,7 +76,7 @@ public class MetricsAccumulator {
           instance = accumulator;
         }
       }
-      LOG.info("Instantiated metrics accumulator: " + instance.value());
+      LOG.info("Instantiated metrics accumulator: {}", instance.value());
     } else {
       instance.reset();
     }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 05187509269..81bd254678e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -279,7 +279,7 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
                   FluentIterable.from(JavaConversions.asJavaIterable(encodedElements))
                       .transform(bytes -> CoderHelpers.fromByteArray(bytes, wvCoder));
 
-              LOG.trace(logPrefix + ": input elements: {}", elements);
+              LOG.trace("{}: input elements: {}", logPrefix, elements);
 
               // Incoming expired windows are filtered based on
               // timerInternals.currentInputWatermarkTime() and the configured allowed
@@ -294,7 +294,7 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
                       LateDataUtils.dropExpiredWindows(
                           key, elements, timerInternals, windowingStrategy, droppedDueToLateness));
 
-              LOG.trace(logPrefix + ": non expired input elements: {}", nonExpiredElements);
+              LOG.trace("{}: non expired input elements: {}", logPrefix, nonExpiredElements);
 
               reduceFnRunner.processElements(nonExpiredElements);
             } catch (final Exception e) {
@@ -306,8 +306,7 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
           }
           try {
             // advance the watermark to HWM to fire by timers.
-            LOG.debug(
-                logPrefix + ": timerInternals before advance are {}", timerInternals.toString());
+            LOG.debug("{}: timerInternals before advance are {}", logPrefix, timerInternals);
 
             // store the highWatermark as the new inputWatermark to calculate triggers
             timerInternals.advanceWatermark();
@@ -317,7 +316,9 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
                     timerInternals.getTimers(), timerInternals.currentInputWatermarkTime());
 
             LOG.debug(
-                logPrefix + ": timers eligible for processing are {}", timersEligibleForProcessing);
+                "{}: timers eligible for processing are {}",
+                logPrefix,
+                timersEligibleForProcessing);
 
             // Note that at this point, the watermark has already advanced since
             // timerInternals.advanceWatermark() has been called and the highWatermark
@@ -352,12 +353,10 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
                     SparkTimerInternals.serializeTimers(
                         timerInternals.getTimers(), timerDataCoder));
 
-            /*
-            Not something we want to happen in production, but is very helpful
-            when debugging - TRACE.
-             */
-            LOG.trace(logPrefix + ": output elements are {}", Joiner.on(", ").join(outputs));
-
+            if (LOG.isTraceEnabled()) {
+              // Not something we want to happen in production, but is very helpful when debugging.
+              LOG.trace("{}: output elements are {}", logPrefix, Joiner.on(", ").join(outputs));
+            }
             // persist Spark's state by outputting.
             final List<byte[]> serOutput = CoderHelpers.toByteArrays(outputs, wvKvIterCoder);
             return new Tuple2<>(encodedKey, new Tuple2<>(updated, serOutput));
@@ -440,12 +439,12 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
       // log if there's something to log.
       final long lateDropped = droppedDueToLateness.getCumulative();
       if (lateDropped > 0) {
-        LOG.info(String.format("Dropped %d elements due to lateness.", lateDropped));
+        LOG.info("Dropped {} elements due to lateness.", lateDropped);
         droppedDueToLateness.inc(-droppedDueToLateness.getCumulative());
       }
       final long closedWindowDropped = droppedDueToClosedWindow.getCumulative();
       if (closedWindowDropped > 0) {
-        LOG.info(String.format("Dropped %d elements due to closed window.", closedWindowDropped));
+        LOG.info("Dropped {} elements due to closed window.", closedWindowDropped);
         droppedDueToClosedWindow.inc(-droppedDueToClosedWindow.getCumulative());
       }
 
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java
index 637b04a3f10..24b5a0f867f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java
@@ -114,8 +114,7 @@ class WatermarkSyncedDStream<T> extends InputDStream<WindowedValue<T>> {
     final long batchTime = validTime.milliseconds();
 
     LOG.trace(
-        "BEFORE waiting for watermark sync, "
-            + "LastWatermarkedBatchTime: {}, current batch time: {}",
+        "BEFORE waiting for watermark sync, LastWatermarkedBatchTime: {}, current batch time: {}",
         GlobalWatermarkHolder.getLastWatermarkedBatchTime(),
         batchTime);
 
@@ -133,8 +132,7 @@ class WatermarkSyncedDStream<T> extends InputDStream<WindowedValue<T>> {
     LOG.info("Watermarks are now: {}", GlobalWatermarkHolder.get(batchDuration));
 
     LOG.trace(
-        "AFTER waiting for watermark sync, "
-            + "LastWatermarkedBatchTime: {}, current batch time: {}",
+        "AFTER waiting for watermark sync, LastWatermarkedBatchTime: {}, current batch time: {}",
         GlobalWatermarkHolder.getLastWatermarkedBatchTime(),
         batchTime);