You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2017/03/29 15:13:09 UTC
[1/2] beam git commit: [BEAM-1827] Fix use of deprecated Spark APIs
in the runner.
Repository: beam
Updated Branches:
refs/heads/master 99056df36 -> 8a33591d9
[BEAM-1827] Fix use of deprecated Spark APIs in the runner.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6671b5b6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6671b5b6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6671b5b6
Branch: refs/heads/master
Commit: 6671b5b6bae6c2a918481577ca2564bb45e7c280
Parents: 99056df
Author: Amit Sela <am...@gmail.com>
Authored: Wed Mar 29 10:39:49 2017 +0300
Committer: Amit Sela <am...@gmail.com>
Committed: Wed Mar 29 15:56:21 2017 +0300
----------------------------------------------------------------------
.../beam/runners/spark/SparkPipelineResult.java | 51 +++----
.../apache/beam/runners/spark/SparkRunner.java | 153 +++++++++----------
.../beam/runners/spark/io/SourceDStream.java | 4 +-
.../SparkRunnerStreamingContextFactory.java | 43 +++---
4 files changed, 113 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index b2b2831..d2c5c8e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -38,9 +38,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.joda.time.Duration;
-/**
- * Represents a Spark pipeline execution result.
- */
+/** Represents a Spark pipeline execution result. */
public abstract class SparkPipelineResult implements PipelineResult {
protected final Future pipelineExecution;
@@ -48,8 +46,7 @@ public abstract class SparkPipelineResult implements PipelineResult {
protected PipelineResult.State state;
private final SparkMetricResults metricResults = new SparkMetricResults();
- SparkPipelineResult(final Future<?> pipelineExecution,
- final JavaSparkContext javaSparkContext) {
+ SparkPipelineResult(final Future<?> pipelineExecution, final JavaSparkContext javaSparkContext) {
this.pipelineExecution = pipelineExecution;
this.javaSparkContext = javaSparkContext;
// pipelineExecution is expected to have started executing eagerly.
@@ -130,13 +127,10 @@ public abstract class SparkPipelineResult implements PipelineResult {
return state;
}
- /**
- * Represents the result of running a batch pipeline.
- */
+ /** Represents the result of running a batch pipeline. */
static class BatchMode extends SparkPipelineResult {
- BatchMode(final Future<?> pipelineExecution,
- final JavaSparkContext javaSparkContext) {
+ BatchMode(final Future<?> pipelineExecution, final JavaSparkContext javaSparkContext) {
super(pipelineExecution, javaSparkContext);
}
@@ -156,15 +150,13 @@ public abstract class SparkPipelineResult implements PipelineResult {
}
}
- /**
- * Represents a streaming Spark pipeline result.
- */
+ /** Represents a streaming Spark pipeline result. */
static class StreamingMode extends SparkPipelineResult {
private final JavaStreamingContext javaStreamingContext;
- StreamingMode(final Future<?> pipelineExecution,
- final JavaStreamingContext javaStreamingContext) {
+ StreamingMode(
+ final Future<?> pipelineExecution, final JavaStreamingContext javaStreamingContext) {
super(pipelineExecution, javaStreamingContext.sparkContext());
this.javaStreamingContext = javaStreamingContext;
}
@@ -176,7 +168,7 @@ public abstract class SparkPipelineResult implements PipelineResult {
// calling the StreamingContext's waiter with 0 msec will throw any error that might have
// been thrown during the "grace period".
try {
- javaStreamingContext.awaitTermination(0);
+ javaStreamingContext.awaitTerminationOrTimeout(0);
} catch (Exception e) {
throw beamExceptionFrom(e);
} finally {
@@ -188,24 +180,24 @@ public abstract class SparkPipelineResult implements PipelineResult {
}
@Override
- protected State awaitTermination(final Duration duration) throws ExecutionException,
- InterruptedException {
+ protected State awaitTermination(final Duration duration)
+ throws ExecutionException, InterruptedException {
pipelineExecution.get(); // execution is asynchronous anyway so no need to time-out.
javaStreamingContext.awaitTerminationOrTimeout(duration.getMillis());
State terminationState;
switch (javaStreamingContext.getState()) {
- case ACTIVE:
- terminationState = State.RUNNING;
- break;
- case STOPPED:
- terminationState = State.DONE;
- break;
- default:
- terminationState = State.UNKNOWN;
- break;
- }
- return terminationState;
+ case ACTIVE:
+ terminationState = State.RUNNING;
+ break;
+ case STOPPED:
+ terminationState = State.DONE;
+ break;
+ default:
+ terminationState = State.UNKNOWN;
+ break;
+ }
+ return terminationState;
}
}
@@ -216,5 +208,4 @@ public abstract class SparkPipelineResult implements PipelineResult {
stop();
}
}
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 97532c4..5b4f73e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -65,40 +65,32 @@ import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
- * The SparkRunner translate operations defined on a pipeline to a representation
- * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run
- * a Beam pipeline with the default options of a single threaded spark instance in local mode,
- * we would do the following:
+ * The SparkRunner translate operations defined on a pipeline to a representation executable by
+ * Spark, and then submitting the job to Spark to be executed. If we wanted to run a Beam pipeline
+ * with the default options of a single threaded spark instance in local mode, we would do the
+ * following:
*
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * SparkPipelineResult result = (SparkPipelineResult) p.run();
- * }
+ * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineResult result =
+ * (SparkPipelineResult) p.run(); }
*
* <p>To create a pipeline runner to run against a different spark cluster, with a custom master url
* we would do the following:
*
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
- * options.setSparkMaster("spark://host:port");
- * SparkPipelineResult result = (SparkPipelineResult) p.run();
- * }
+ * <p>{@code Pipeline p = [logic for pipeline creation] SparkPipelineOptions options =
+ * SparkPipelineOptionsFactory.create(); options.setSparkMaster("spark://host:port");
+ * SparkPipelineResult result = (SparkPipelineResult) p.run(); }
*/
public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
- /**
- * Options used in this pipeline runner.
- */
+ /** Options used in this pipeline runner. */
private final SparkPipelineOptions mOptions;
/**
- * Creates and returns a new SparkRunner with default options. In particular, against a
- * spark instance running in local mode.
+ * Creates and returns a new SparkRunner with default options. In particular, against a spark
+ * instance running in local mode.
*
* @return A pipeline runner with default options.
*/
@@ -156,11 +148,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
if (mOptions.isStreaming()) {
CheckpointDir checkpointDir = new CheckpointDir(mOptions.getCheckpointDir());
- final SparkRunnerStreamingContextFactory contextFactory =
+ SparkRunnerStreamingContextFactory streamingContextFactory =
new SparkRunnerStreamingContextFactory(pipeline, mOptions, checkpointDir);
final JavaStreamingContext jssc =
- JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(),
- contextFactory);
+ JavaStreamingContext.getOrCreate(
+ checkpointDir.getSparkCheckpointDir().toString(), streamingContextFactory);
// Checkpoint aggregator/metrics values
jssc.addStreamingListener(
@@ -171,7 +163,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));
// register user-defined listeners.
- for (JavaStreamingListener listener: mOptions.as(SparkContextOptions.class).getListeners()) {
+ for (JavaStreamingListener listener : mOptions.as(SparkContextOptions.class).getListeners()) {
LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
}
@@ -185,14 +177,16 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
// but this is fine since it is idempotent).
initAccumulators(mOptions, jssc.sparkContext());
- startPipeline = executorService.submit(new Runnable() {
+ startPipeline =
+ executorService.submit(
+ new Runnable() {
- @Override
- public void run() {
- LOG.info("Starting streaming pipeline execution.");
- jssc.start();
- }
- });
+ @Override
+ public void run() {
+ LOG.info("Starting streaming pipeline execution.");
+ jssc.start();
+ }
+ });
result = new SparkPipelineResult.StreamingMode(startPipeline, jssc);
} else {
@@ -206,15 +200,17 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
initAccumulators(mOptions, jsc);
- startPipeline = executorService.submit(new Runnable() {
+ startPipeline =
+ executorService.submit(
+ new Runnable() {
- @Override
- public void run() {
- pipeline.traverseTopologically(new Evaluator(translator, evaluationContext));
- evaluationContext.computeOutputs();
- LOG.info("Batch pipeline execution complete.");
- }
- });
+ @Override
+ public void run() {
+ pipeline.traverseTopologically(new Evaluator(translator, evaluationContext));
+ evaluationContext.computeOutputs();
+ LOG.info("Batch pipeline execution complete.");
+ }
+ });
result = new SparkPipelineResult.BatchMode(startPipeline, jsc);
}
@@ -227,30 +223,28 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
}
private void registerMetricsSource(String appName) {
- final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
- final AggregatorMetricSource aggregatorMetricSource =
- new AggregatorMetricSource(null, AggregatorsAccumulator.getInstance().value());
- final SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null);
- final CompositeSource compositeSource =
- new CompositeSource(appName + ".Beam", metricsSource.metricRegistry(),
- aggregatorMetricSource.metricRegistry());
- // re-register the metrics in case of context re-use
- metricsSystem.removeSource(compositeSource);
- metricsSystem.registerSource(compositeSource);
+ final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
+ final AggregatorMetricSource aggregatorMetricSource =
+ new AggregatorMetricSource(null, AggregatorsAccumulator.getInstance().value());
+ final SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null);
+ final CompositeSource compositeSource =
+ new CompositeSource(
+ appName + ".Beam",
+ metricsSource.metricRegistry(),
+ aggregatorMetricSource.metricRegistry());
+ // re-register the metrics in case of context re-use
+ metricsSystem.removeSource(compositeSource);
+ metricsSystem.registerSource(compositeSource);
}
- /**
- * Init Metrics/Aggregators accumulators. This method is idempotent.
- */
+ /** Init Metrics/Aggregators accumulators. This method is idempotent. */
public static void initAccumulators(SparkPipelineOptions opts, JavaSparkContext jsc) {
// Init metrics accumulators
MetricsAccumulator.init(opts, jsc);
AggregatorsAccumulator.init(opts, jsc);
}
- /**
- * Visit the pipeline to determine the translation mode (batch/streaming).
- */
+ /** Visit the pipeline to determine the translation mode (batch/streaming). */
private void detectTranslationMode(Pipeline pipeline) {
TranslationModeDetector detector = new TranslationModeDetector();
pipeline.traverseTopologically(detector);
@@ -260,20 +254,14 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
}
}
- /**
- * Evaluator that update/populate the cache candidates.
- */
+ /** Evaluator that update/populate the cache candidates. */
public static void updateCacheCandidates(
- Pipeline pipeline,
- SparkPipelineTranslator translator,
- EvaluationContext evaluationContext) {
- CacheVisitor cacheVisitor = new CacheVisitor(translator, evaluationContext);
- pipeline.traverseTopologically(cacheVisitor);
+ Pipeline pipeline, SparkPipelineTranslator translator, EvaluationContext evaluationContext) {
+ CacheVisitor cacheVisitor = new CacheVisitor(translator, evaluationContext);
+ pipeline.traverseTopologically(cacheVisitor);
}
- /**
- * The translation mode of the Beam Pipeline.
- */
+ /** The translation mode of the Beam Pipeline. */
enum TranslationMode {
/** Uses the batch mode. */
BATCH,
@@ -281,9 +269,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
STREAMING
}
- /**
- * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline.
- */
+ /** Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. */
private static class TranslationModeDetector extends Pipeline.PipelineVisitor.Defaults {
private static final Logger LOG = LoggerFactory.getLogger(TranslationModeDetector.class);
private static final Collection<Class<? extends PTransform>> UNBOUNDED_INPUTS =
@@ -315,14 +301,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
}
}
- /**
- * Traverses the pipeline to populate the candidates for caching.
- */
+ /** Traverses the pipeline to populate the candidates for caching. */
static class CacheVisitor extends Evaluator {
protected CacheVisitor(
- SparkPipelineTranslator translator,
- EvaluationContext evaluationContext) {
+ SparkPipelineTranslator translator, EvaluationContext evaluationContext) {
super(translator, evaluationContext);
}
@@ -345,9 +328,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
}
}
- /**
- * Evaluator on the pipeline.
- */
+ /** Evaluator on the pipeline. */
@SuppressWarnings("WeakerAccess")
public static class Evaluator extends Pipeline.PipelineVisitor.Defaults {
private static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
@@ -399,7 +380,9 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
}
// defer if sideInputs are defined.
if (hasSideInput) {
- LOG.info("Deferring combine transformation {} for job {}", transform,
+ LOG.info(
+ "Deferring combine transformation {} for job {}",
+ transform,
ctxt.getPipeline().getOptions().getJobName());
return true;
}
@@ -412,14 +395,14 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
doVisitTransform(node);
}
- <TransformT extends PTransform<? super PInput, POutput>> void
- doVisitTransform(TransformHierarchy.Node node) {
+ <TransformT extends PTransform<? super PInput, POutput>> void doVisitTransform(
+ TransformHierarchy.Node node) {
@SuppressWarnings("unchecked")
TransformT transform = (TransformT) node.getTransform();
@SuppressWarnings("unchecked")
Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass();
- @SuppressWarnings("unchecked") TransformEvaluator<TransformT> evaluator =
- translate(node, transform, transformClass);
+ @SuppressWarnings("unchecked")
+ TransformEvaluator<TransformT> evaluator = translate(node, transform, transformClass);
LOG.info("Evaluating {}", transform);
AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform();
ctxt.setCurrentTransform(appliedTransform);
@@ -432,7 +415,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
* translate with the proper translator.
*/
protected <TransformT extends PTransform<? super PInput, POutput>>
- TransformEvaluator<TransformT> translate(
+ TransformEvaluator<TransformT> translate(
TransformHierarchy.Node node, TransformT transform, Class<TransformT> transformClass) {
//--- determine if node is bounded/unbounded.
// usually, the input determines if the PCollection to apply the next transformation to
@@ -449,7 +432,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
LOG.debug("Translating {} as {}", transform, isNodeBounded);
return isNodeBounded.equals(PCollection.IsBounded.BOUNDED)
? translator.translateBounded(transformClass)
- : translator.translateUnbounded(transformClass);
+ : translator.translateUnbounded(transformClass);
}
protected PCollection.IsBounded isBoundedCollection(Collection<TaggedPValue> pValues) {
@@ -458,7 +441,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
// BOUNDED behaves as the Identity Element, BOUNDED + BOUNDED = BOUNDED
// while BOUNDED + UNBOUNDED = UNBOUNDED.
PCollection.IsBounded isBounded = PCollection.IsBounded.BOUNDED;
- for (TaggedPValue pValue: pValues) {
+ for (TaggedPValue pValue : pValues) {
if (pValue.getValue() instanceof PCollection) {
isBounded = isBounded.and(((PCollection) pValue.getValue()).isBounded());
} else {
http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index 3f2c10a..b7bfeed 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -87,7 +87,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(),
options.getMinReadTimeMillis());
// set initial parallelism once.
- this.initialParallelism = ssc().sc().defaultParallelism();
+ this.initialParallelism = ssc().sparkContext().defaultParallelism();
checkArgument(this.initialParallelism > 0, "Number of partitions must be greater than zero.");
this.boundMaxRecords = boundMaxRecords > 0 ? boundMaxRecords : rateControlledMaxRecords();
@@ -106,7 +106,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
public scala.Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time validTime) {
RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd =
new SourceRDD.Unbounded<>(
- ssc().sc(),
+ ssc().sparkContext(),
runtimeContext,
createMicrobatchSource(),
numPartitions);
http://git-wip-us.apache.org/repos/asf/beam/blob/6671b5b6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index 98521e9..2dd18f3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -32,47 +32,47 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function0;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* A {@link JavaStreamingContext} factory for resilience.
- * @see <a href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#how-to-configure-checkpointing">how-to-configure-checkpointing</a>
+ *
+ * @see <a
+ * href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#how-to-configure-checkpointing">how-to-configure-checkpointing</a>
*/
-public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory {
+public class SparkRunnerStreamingContextFactory implements Function0<JavaStreamingContext> {
private static final Logger LOG =
LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class);
- private final Pipeline pipeline;
- private final SparkPipelineOptions options;
- private final CheckpointDir checkpointDir;
+ // set members as transient to satisfy findbugs and since this only runs in driver.
+ private final transient Pipeline pipeline;
+ private final transient SparkPipelineOptions options;
+ private final transient CheckpointDir checkpointDir;
public SparkRunnerStreamingContextFactory(
- Pipeline pipeline,
- SparkPipelineOptions options,
- CheckpointDir checkpointDir) {
+ Pipeline pipeline, SparkPipelineOptions options, CheckpointDir checkpointDir) {
this.pipeline = pipeline;
this.options = options;
this.checkpointDir = checkpointDir;
}
- private EvaluationContext ctxt;
-
@Override
- public JavaStreamingContext create() {
+ public JavaStreamingContext call() throws Exception {
LOG.info("Creating a new Spark Streaming Context");
// validate unbounded read properties.
- checkArgument(options.getMinReadTimeMillis() < options.getBatchIntervalMillis(),
+ checkArgument(
+ options.getMinReadTimeMillis() < options.getBatchIntervalMillis(),
"Minimum read time has to be less than batch time.");
- checkArgument(options.getReadTimePercentage() > 0 && options.getReadTimePercentage() < 1,
+ checkArgument(
+ options.getReadTimePercentage() > 0 && options.getReadTimePercentage() < 1,
"Read time percentage is bound to (0, 1).");
- SparkPipelineTranslator translator = new StreamingTransformTranslator.Translator(
- new TransformTranslator.Translator());
+ SparkPipelineTranslator translator =
+ new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
Duration batchDuration = new Duration(options.getBatchIntervalMillis());
LOG.info("Setting Spark streaming batchDuration to {} msec", batchDuration.milliseconds());
@@ -82,24 +82,25 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
// We must first init accumulators since translators expect them to be instantiated.
SparkRunner.initAccumulators(options, jsc);
- ctxt = new EvaluationContext(jsc, pipeline, jssc);
+ EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc);
// update cache candidates
SparkRunner.updateCacheCandidates(pipeline, translator, ctxt);
pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
ctxt.computeOutputs();
- checkpoint(jssc);
+ checkpoint(jssc, checkpointDir);
return jssc;
}
- private void checkpoint(JavaStreamingContext jssc) {
+ private void checkpoint(JavaStreamingContext jssc, CheckpointDir checkpointDir) {
Path rootCheckpointPath = checkpointDir.getRootCheckpointDir();
Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir();
Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir();
try {
- FileSystem fileSystem = rootCheckpointPath.getFileSystem(jssc.sc().hadoopConfiguration());
+ FileSystem fileSystem =
+ rootCheckpointPath.getFileSystem(jssc.sparkContext().hadoopConfiguration());
if (!fileSystem.exists(rootCheckpointPath)) {
fileSystem.mkdirs(rootCheckpointPath);
}
[2/2] beam git commit: This closes #2354
Posted by am...@apache.org.
This closes #2354
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8a33591d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8a33591d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8a33591d
Branch: refs/heads/master
Commit: 8a33591d9bc9dfd328f3b5a3ac22be192d81531a
Parents: 99056df 6671b5b
Author: Amit Sela <am...@gmail.com>
Authored: Wed Mar 29 15:56:35 2017 +0300
Committer: Amit Sela <am...@gmail.com>
Committed: Wed Mar 29 15:56:35 2017 +0300
----------------------------------------------------------------------
.../beam/runners/spark/SparkPipelineResult.java | 51 +++----
.../apache/beam/runners/spark/SparkRunner.java | 153 +++++++++----------
.../beam/runners/spark/io/SourceDStream.java | 4 +-
.../SparkRunnerStreamingContextFactory.java | 43 +++---
4 files changed, 113 insertions(+), 138 deletions(-)
----------------------------------------------------------------------