You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2017/03/29 15:13:09 UTC

[1/2] beam git commit: [BEAM-1827] Fix use of deprecated Spark APIs in the runner.

Repository: beam
Updated Branches:
  refs/heads/master 99056df36 -> 8a33591d9


[BEAM-1827] Fix use of deprecated Spark APIs in the runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6671b5b6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6671b5b6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6671b5b6

Branch: refs/heads/master
Commit: 6671b5b6bae6c2a918481577ca2564bb45e7c280
Parents: 99056df
Author: Amit Sela <am...@gmail.com>
Authored: Wed Mar 29 10:39:49 2017 +0300
Committer: Amit Sela <am...@gmail.com>
Committed: Wed Mar 29 15:56:21 2017 +0300

----------------------------------------------------------------------
 .../beam/runners/spark/SparkPipelineResult.java |  51 +++----
 .../apache/beam/runners/spark/SparkRunner.java  | 153 +++++++++----------
 .../beam/runners/spark/io/SourceDStream.java    |   4 +-
 .../SparkRunnerStreamingContextFactory.java     |  43 +++---
 4 files changed, 113 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index b2b2831..d2c5c8e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -38,9 +38,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.joda.time.Duration;
 
-/**
- * Represents a Spark pipeline execution result.
- */
+/** Represents a Spark pipeline execution result. */
 public abstract class SparkPipelineResult implements PipelineResult {
 
   protected final Future pipelineExecution;
@@ -48,8 +46,7 @@ public abstract class SparkPipelineResult implements PipelineResult {
   protected PipelineResult.State state;
   private final SparkMetricResults metricResults = new SparkMetricResults();
 
-  SparkPipelineResult(final Future<?> pipelineExecution,
-                      final JavaSparkContext javaSparkContext) {
+  SparkPipelineResult(final Future<?> pipelineExecution, final JavaSparkContext javaSparkContext) {
     this.pipelineExecution = pipelineExecution;
     this.javaSparkContext = javaSparkContext;
     // pipelineExecution is expected to have started executing eagerly.
@@ -130,13 +127,10 @@ public abstract class SparkPipelineResult implements PipelineResult {
     return state;
   }
 
-  /**
-   * Represents the result of running a batch pipeline.
-   */
+  /** Represents the result of running a batch pipeline. */
   static class BatchMode extends SparkPipelineResult {
 
-    BatchMode(final Future<?> pipelineExecution,
-              final JavaSparkContext javaSparkContext) {
+    BatchMode(final Future<?> pipelineExecution, final JavaSparkContext javaSparkContext) {
       super(pipelineExecution, javaSparkContext);
     }
 
@@ -156,15 +150,13 @@ public abstract class SparkPipelineResult implements PipelineResult {
     }
   }
 
-  /**
-   * Represents a streaming Spark pipeline result.
-   */
+  /** Represents a streaming Spark pipeline result. */
   static class StreamingMode extends SparkPipelineResult {
 
     private final JavaStreamingContext javaStreamingContext;
 
-    StreamingMode(final Future<?> pipelineExecution,
-                  final JavaStreamingContext javaStreamingContext) {
+    StreamingMode(
+        final Future<?> pipelineExecution, final JavaStreamingContext javaStreamingContext) {
       super(pipelineExecution, javaStreamingContext.sparkContext());
       this.javaStreamingContext = javaStreamingContext;
     }
@@ -176,7 +168,7 @@ public abstract class SparkPipelineResult implements PipelineResult {
       // calling the StreamingContext's waiter with 0 msec will throw any error that might have
       // been thrown during the "grace period".
       try {
-        javaStreamingContext.awaitTermination(0);
+        javaStreamingContext.awaitTerminationOrTimeout(0);
       } catch (Exception e) {
         throw beamExceptionFrom(e);
       } finally {
@@ -188,24 +180,24 @@ public abstract class SparkPipelineResult implements PipelineResult {
     }
 
     @Override
-    protected State awaitTermination(final Duration duration) throws ExecutionException,
-        InterruptedException {
+    protected State awaitTermination(final Duration duration)
+        throws ExecutionException, InterruptedException {
       pipelineExecution.get(); // execution is asynchronous anyway so no need to time-out.
       javaStreamingContext.awaitTerminationOrTimeout(duration.getMillis());
 
       State terminationState;
       switch (javaStreamingContext.getState()) {
-         case ACTIVE:
-           terminationState = State.RUNNING;
-           break;
-         case STOPPED:
-           terminationState = State.DONE;
-           break;
-         default:
-           terminationState = State.UNKNOWN;
-           break;
-       }
-       return terminationState;
+        case ACTIVE:
+          terminationState = State.RUNNING;
+          break;
+        case STOPPED:
+          terminationState = State.DONE;
+          break;
+        default:
+          terminationState = State.UNKNOWN;
+          break;
+      }
+      return terminationState;
     }
   }
 
@@ -216,5 +208,4 @@ public abstract class SparkPipelineResult implements PipelineResult {
       stop();
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 97532c4..5b4f73e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -65,40 +65,32 @@ import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * The SparkRunner translate operations defined on a pipeline to a representation
- * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run
- * a Beam pipeline with the default options of a single threaded spark instance in local mode,
- * we would do the following:
+ * The SparkRunner translate operations defined on a pipeline to a representation executable by
+ * Spark, and then submitting the job to Spark to be executed. If we wanted to run a Beam pipeline
+ * with the default options of a single threaded spark instance in local mode, we would do the
+ * following:
  *
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * SparkPipelineResult result = (SparkPipelineResult) p.run();
- * }
+ * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineResult result =
+ * (SparkPipelineResult) p.run(); }
  *
  * <p>To create a pipeline runner to run against a different spark cluster, with a custom master url
  * we would do the following:
  *
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
- * options.setSparkMaster("spark://host:port");
- * SparkPipelineResult result = (SparkPipelineResult) p.run();
- * }
+ * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineOptions options =
+ * SparkPipelineOptionsFactory.create(); options.setSparkMaster("spark://host:port");
+ * SparkPipelineResult result = (SparkPipelineResult) p.run(); }
  */
 public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
 
-  /**
-   * Options used in this pipeline runner.
-   */
+  /** Options used in this pipeline runner. */
   private final SparkPipelineOptions mOptions;
 
   /**
-   * Creates and returns a new SparkRunner with default options. In particular, against a
-   * spark instance running in local mode.
+   * Creates and returns a new SparkRunner with default options. In particular, against a spark
+   * instance running in local mode.
    *
    * @return A pipeline runner with default options.
    */
@@ -156,11 +148,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
 
     if (mOptions.isStreaming()) {
       CheckpointDir checkpointDir = new CheckpointDir(mOptions.getCheckpointDir());
-      final SparkRunnerStreamingContextFactory contextFactory =
+      SparkRunnerStreamingContextFactory streamingContextFactory =
           new SparkRunnerStreamingContextFactory(pipeline, mOptions, checkpointDir);
       final JavaStreamingContext jssc =
-          JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(),
-              contextFactory);
+          JavaStreamingContext.getOrCreate(
+              checkpointDir.getSparkCheckpointDir().toString(), streamingContextFactory);
 
       // Checkpoint aggregator/metrics values
       jssc.addStreamingListener(
@@ -171,7 +163,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
               new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));
 
       // register user-defined listeners.
-      for (JavaStreamingListener listener: mOptions.as(SparkContextOptions.class).getListeners()) {
+      for (JavaStreamingListener listener : mOptions.as(SparkContextOptions.class).getListeners()) {
         LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
         jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
       }
@@ -185,14 +177,16 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       // but this is fine since it is idempotent).
       initAccumulators(mOptions, jssc.sparkContext());
 
-      startPipeline = executorService.submit(new Runnable() {
+      startPipeline =
+          executorService.submit(
+              new Runnable() {
 
-        @Override
-        public void run() {
-          LOG.info("Starting streaming pipeline execution.");
-          jssc.start();
-        }
-      });
+                @Override
+                public void run() {
+                  LOG.info("Starting streaming pipeline execution.");
+                  jssc.start();
+                }
+              });
 
       result = new SparkPipelineResult.StreamingMode(startPipeline, jssc);
     } else {
@@ -206,15 +200,17 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
 
       initAccumulators(mOptions, jsc);
 
-      startPipeline = executorService.submit(new Runnable() {
+      startPipeline =
+          executorService.submit(
+              new Runnable() {
 
-        @Override
-        public void run() {
-          pipeline.traverseTopologically(new Evaluator(translator, evaluationContext));
-          evaluationContext.computeOutputs();
-          LOG.info("Batch pipeline execution complete.");
-        }
-      });
+                @Override
+                public void run() {
+                  pipeline.traverseTopologically(new Evaluator(translator, evaluationContext));
+                  evaluationContext.computeOutputs();
+                  LOG.info("Batch pipeline execution complete.");
+                }
+              });
 
       result = new SparkPipelineResult.BatchMode(startPipeline, jsc);
     }
@@ -227,30 +223,28 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
   }
 
   private void registerMetricsSource(String appName) {
-      final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
-      final AggregatorMetricSource aggregatorMetricSource =
-          new AggregatorMetricSource(null, AggregatorsAccumulator.getInstance().value());
-      final SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null);
-      final CompositeSource compositeSource =
-          new CompositeSource(appName + ".Beam", metricsSource.metricRegistry(),
-              aggregatorMetricSource.metricRegistry());
-      // re-register the metrics in case of context re-use
-      metricsSystem.removeSource(compositeSource);
-      metricsSystem.registerSource(compositeSource);
+    final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
+    final AggregatorMetricSource aggregatorMetricSource =
+        new AggregatorMetricSource(null, AggregatorsAccumulator.getInstance().value());
+    final SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null);
+    final CompositeSource compositeSource =
+        new CompositeSource(
+            appName + ".Beam",
+            metricsSource.metricRegistry(),
+            aggregatorMetricSource.metricRegistry());
+    // re-register the metrics in case of context re-use
+    metricsSystem.removeSource(compositeSource);
+    metricsSystem.registerSource(compositeSource);
   }
 
-  /**
-   * Init Metrics/Aggregators accumulators. This method is idempotent.
-   */
+  /** Init Metrics/Aggregators accumulators. This method is idempotent. */
   public static void initAccumulators(SparkPipelineOptions opts, JavaSparkContext jsc) {
     // Init metrics accumulators
     MetricsAccumulator.init(opts, jsc);
     AggregatorsAccumulator.init(opts, jsc);
   }
 
-  /**
-   * Visit the pipeline to determine the translation mode (batch/streaming).
-   */
+  /** Visit the pipeline to determine the translation mode (batch/streaming). */
   private void detectTranslationMode(Pipeline pipeline) {
     TranslationModeDetector detector = new TranslationModeDetector();
     pipeline.traverseTopologically(detector);
@@ -260,20 +254,14 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
     }
   }
 
-  /**
-   * Evaluator that update/populate the cache candidates.
-   */
+  /** Evaluator that update/populate the cache candidates. */
   public static void updateCacheCandidates(
-      Pipeline pipeline,
-      SparkPipelineTranslator translator,
-      EvaluationContext evaluationContext) {
-     CacheVisitor cacheVisitor = new CacheVisitor(translator, evaluationContext);
-     pipeline.traverseTopologically(cacheVisitor);
+      Pipeline pipeline, SparkPipelineTranslator translator, EvaluationContext evaluationContext) {
+    CacheVisitor cacheVisitor = new CacheVisitor(translator, evaluationContext);
+    pipeline.traverseTopologically(cacheVisitor);
   }
 
-  /**
-   * The translation mode of the Beam Pipeline.
-   */
+  /** The translation mode of the Beam Pipeline. */
   enum TranslationMode {
     /** Uses the batch mode. */
     BATCH,
@@ -281,9 +269,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
     STREAMING
   }
 
-  /**
-   * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline.
-   */
+  /** Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. */
   private static class TranslationModeDetector extends Pipeline.PipelineVisitor.Defaults {
     private static final Logger LOG = LoggerFactory.getLogger(TranslationModeDetector.class);
     private static final Collection<Class<? extends PTransform>> UNBOUNDED_INPUTS =
@@ -315,14 +301,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
     }
   }
 
-  /**
-   * Traverses the pipeline to populate the candidates for caching.
-   */
+  /** Traverses the pipeline to populate the candidates for caching. */
   static class CacheVisitor extends Evaluator {
 
     protected CacheVisitor(
-        SparkPipelineTranslator translator,
-        EvaluationContext evaluationContext) {
+        SparkPipelineTranslator translator, EvaluationContext evaluationContext) {
       super(translator, evaluationContext);
     }
 
@@ -345,9 +328,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
     }
   }
 
-  /**
-   * Evaluator on the pipeline.
-   */
+  /** Evaluator on the pipeline. */
   @SuppressWarnings("WeakerAccess")
   public static class Evaluator extends Pipeline.PipelineVisitor.Defaults {
     private static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
@@ -399,7 +380,9 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       }
       // defer if sideInputs are defined.
       if (hasSideInput) {
-        LOG.info("Deferring combine transformation {} for job {}", transform,
+        LOG.info(
+            "Deferring combine transformation {} for job {}",
+            transform,
             ctxt.getPipeline().getOptions().getJobName());
         return true;
       }
@@ -412,14 +395,14 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       doVisitTransform(node);
     }
 
-    <TransformT extends PTransform<? super PInput, POutput>> void
-    doVisitTransform(TransformHierarchy.Node node) {
+    <TransformT extends PTransform<? super PInput, POutput>> void doVisitTransform(
+        TransformHierarchy.Node node) {
       @SuppressWarnings("unchecked")
       TransformT transform = (TransformT) node.getTransform();
       @SuppressWarnings("unchecked")
       Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass();
-      @SuppressWarnings("unchecked") TransformEvaluator<TransformT> evaluator =
-          translate(node, transform, transformClass);
+      @SuppressWarnings("unchecked")
+      TransformEvaluator<TransformT> evaluator = translate(node, transform, transformClass);
       LOG.info("Evaluating {}", transform);
       AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform();
       ctxt.setCurrentTransform(appliedTransform);
@@ -432,7 +415,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
      * translate with the proper translator.
      */
     protected <TransformT extends PTransform<? super PInput, POutput>>
-    TransformEvaluator<TransformT> translate(
+        TransformEvaluator<TransformT> translate(
             TransformHierarchy.Node node, TransformT transform, Class<TransformT> transformClass) {
       //--- determine if node is bounded/unbounded.
       // usually, the input determines if the PCollection to apply the next transformation to
@@ -449,7 +432,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       LOG.debug("Translating {} as {}", transform, isNodeBounded);
       return isNodeBounded.equals(PCollection.IsBounded.BOUNDED)
           ? translator.translateBounded(transformClass)
-              : translator.translateUnbounded(transformClass);
+          : translator.translateUnbounded(transformClass);
     }
 
     protected PCollection.IsBounded isBoundedCollection(Collection<TaggedPValue> pValues) {
@@ -458,7 +441,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       // BOUNDED behaves as the Identity Element, BOUNDED + BOUNDED = BOUNDED
       // while BOUNDED + UNBOUNDED = UNBOUNDED.
       PCollection.IsBounded isBounded = PCollection.IsBounded.BOUNDED;
-      for (TaggedPValue pValue: pValues) {
+      for (TaggedPValue pValue : pValues) {
         if (pValue.getValue() instanceof PCollection) {
           isBounded = isBounded.and(((PCollection) pValue.getValue()).isBounded());
         } else {

http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index 3f2c10a..b7bfeed 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -87,7 +87,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(),
         options.getMinReadTimeMillis());
     // set initial parallelism once.
-    this.initialParallelism = ssc().sc().defaultParallelism();
+    this.initialParallelism = ssc().sparkContext().defaultParallelism();
     checkArgument(this.initialParallelism > 0, "Number of partitions must be greater than zero.");
 
     this.boundMaxRecords = boundMaxRecords > 0 ? boundMaxRecords : rateControlledMaxRecords();
@@ -106,7 +106,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
   public scala.Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time validTime) {
     RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd =
         new SourceRDD.Unbounded<>(
-            ssc().sc(),
+            ssc().sparkContext(),
             runtimeContext,
             createMicrobatchSource(),
             numPartitions);

http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index 98521e9..2dd18f3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -32,47 +32,47 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function0;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * A {@link JavaStreamingContext} factory for resilience.
- * @see <a href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#how-to-configure-checkpointing">how-to-configure-checkpointing</a>
+ *
+ * @see <a
+ *     href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#how-to-configure-checkpointing">how-to-configure-checkpointing</a>
  */
-public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory {
+public class SparkRunnerStreamingContextFactory implements Function0<JavaStreamingContext> {
   private static final Logger LOG =
       LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class);
 
-  private final Pipeline pipeline;
-  private final SparkPipelineOptions options;
-  private final CheckpointDir checkpointDir;
+  // set members as transient to satisfy findbugs and since this only runs in driver.
+  private final transient Pipeline pipeline;
+  private final transient SparkPipelineOptions options;
+  private final transient CheckpointDir checkpointDir;
 
   public SparkRunnerStreamingContextFactory(
-      Pipeline pipeline,
-      SparkPipelineOptions options,
-      CheckpointDir checkpointDir) {
+      Pipeline pipeline, SparkPipelineOptions options, CheckpointDir checkpointDir) {
     this.pipeline = pipeline;
     this.options = options;
     this.checkpointDir = checkpointDir;
   }
 
-  private EvaluationContext ctxt;
-
   @Override
-  public JavaStreamingContext create() {
+  public JavaStreamingContext call() throws Exception {
     LOG.info("Creating a new Spark Streaming Context");
     // validate unbounded read properties.
-    checkArgument(options.getMinReadTimeMillis() < options.getBatchIntervalMillis(),
+    checkArgument(
+        options.getMinReadTimeMillis() < options.getBatchIntervalMillis(),
         "Minimum read time has to be less than batch time.");
-    checkArgument(options.getReadTimePercentage() > 0 && options.getReadTimePercentage() < 1,
+    checkArgument(
+        options.getReadTimePercentage() > 0 && options.getReadTimePercentage() < 1,
         "Read time percentage is bound to (0, 1).");
 
-    SparkPipelineTranslator translator = new StreamingTransformTranslator.Translator(
-        new TransformTranslator.Translator());
+    SparkPipelineTranslator translator =
+        new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
     Duration batchDuration = new Duration(options.getBatchIntervalMillis());
     LOG.info("Setting Spark streaming batchDuration to {} msec", batchDuration.milliseconds());
 
@@ -82,24 +82,25 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
     // We must first init accumulators since translators expect them to be instantiated.
     SparkRunner.initAccumulators(options, jsc);
 
-    ctxt = new EvaluationContext(jsc, pipeline, jssc);
+    EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc);
     // update cache candidates
     SparkRunner.updateCacheCandidates(pipeline, translator, ctxt);
     pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
     ctxt.computeOutputs();
 
-    checkpoint(jssc);
+    checkpoint(jssc, checkpointDir);
 
     return jssc;
   }
 
-  private void checkpoint(JavaStreamingContext jssc) {
+  private void checkpoint(JavaStreamingContext jssc, CheckpointDir checkpointDir) {
     Path rootCheckpointPath = checkpointDir.getRootCheckpointDir();
     Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir();
     Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir();
 
     try {
-      FileSystem fileSystem = rootCheckpointPath.getFileSystem(jssc.sc().hadoopConfiguration());
+      FileSystem fileSystem =
+          rootCheckpointPath.getFileSystem(jssc.sparkContext().hadoopConfiguration());
       if (!fileSystem.exists(rootCheckpointPath)) {
         fileSystem.mkdirs(rootCheckpointPath);
       }


[2/2] beam git commit: This closes #2354

Posted by am...@apache.org.
This closes #2354


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8a33591d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8a33591d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8a33591d

Branch: refs/heads/master
Commit: 8a33591d9bc9dfd328f3b5a3ac22be192d81531a
Parents: 99056df 6671b5b
Author: Amit Sela <am...@gmail.com>
Authored: Wed Mar 29 15:56:35 2017 +0300
Committer: Amit Sela <am...@gmail.com>
Committed: Wed Mar 29 15:56:35 2017 +0300

----------------------------------------------------------------------
 .../beam/runners/spark/SparkPipelineResult.java |  51 +++----
 .../apache/beam/runners/spark/SparkRunner.java  | 153 +++++++++----------
 .../beam/runners/spark/io/SourceDStream.java    |   4 +-
 .../SparkRunnerStreamingContextFactory.java     |  43 +++---
 4 files changed, 113 insertions(+), 138 deletions(-)
----------------------------------------------------------------------