You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2017/01/30 20:58:59 UTC
[1/2] beam git commit: [BEAM-648] Persist and restore Aggergator
values in case of recovery from failure
Repository: beam
Updated Branches:
refs/heads/master 343176c00 -> 847e4e9f0
[BEAM-648] Persist and restore Aggergator values in case of recovery from failure
Added javadoc and minor refactor
Moved creation of beam checkpoint dir
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62f9e7b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62f9e7b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62f9e7b1
Branch: refs/heads/master
Commit: 62f9e7b1e1a8a8f2317e3508ccce615f2b30d4f6
Parents: 343176c
Author: Aviem Zur <av...@gmail.com>
Authored: Sun Jan 22 14:30:44 2017 +0200
Committer: Sela <an...@paypal.com>
Committed: Mon Jan 30 22:53:34 2017 +0200
----------------------------------------------------------------------
.../apache/beam/runners/spark/SparkRunner.java | 21 ++++-
.../spark/aggregators/AccumulatorSingleton.java | 96 ++++++++++++++++++--
.../spark/aggregators/SparkAggregators.java | 20 +++-
.../translation/streaming/CheckpointDir.java | 69 ++++++++++++++
.../SparkRunnerStreamingContextFactory.java | 44 ++++++---
.../ResumeFromCheckpointStreamingTest.java | 5 +-
6 files changed, 230 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 92c07bb..578ed21 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -18,12 +18,14 @@
package org.apache.beam.runners.spark;
+import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.aggregators.SparkAggregators;
import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetricSource;
@@ -32,6 +34,7 @@ import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
+import org.apache.beam.runners.spark.translation.streaming.CheckpointDir;
import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
@@ -54,6 +57,7 @@ import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,7 +134,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
}
private void registerMetrics(final SparkPipelineOptions opts, final JavaSparkContext jsc) {
- final Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(jsc);
+ Optional<CheckpointDir> maybeCheckpointDir =
+ opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir()))
+ : Optional.<CheckpointDir>absent();
+ final Accumulator<NamedAggregators> accum =
+ SparkAggregators.getOrCreateNamedAggregators(jsc, maybeCheckpointDir);
final NamedAggregators initialValue = accum.value();
if (opts.getEnableSparkMetricSinks()) {
@@ -154,10 +162,17 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
detectTranslationMode(pipeline);
if (mOptions.isStreaming()) {
+ CheckpointDir checkpointDir = new CheckpointDir(mOptions.getCheckpointDir());
final SparkRunnerStreamingContextFactory contextFactory =
- new SparkRunnerStreamingContextFactory(pipeline, mOptions);
+ new SparkRunnerStreamingContextFactory(pipeline, mOptions, checkpointDir);
final JavaStreamingContext jssc =
- JavaStreamingContext.getOrCreate(mOptions.getCheckpointDir(), contextFactory);
+ JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(),
+ contextFactory);
+
+ // Checkpoint aggregator values
+ jssc.addStreamingListener(
+ new JavaStreamingListenerWrapper(
+ new AccumulatorSingleton.AccumulatorCheckpointingSparkListener()));
startPipeline = executorService.submit(new Runnable() {
http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
index 883830e..473750c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
@@ -19,35 +19,119 @@
package org.apache.beam.runners.spark.aggregators;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.beam.runners.spark.translation.streaming.CheckpointDir;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* For resilience, {@link Accumulator}s are required to be wrapped in a Singleton.
* @see <a href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables">accumulators</a>
*/
-class AccumulatorSingleton {
+public class AccumulatorSingleton {
+ private static final Logger LOG = LoggerFactory.getLogger(AccumulatorSingleton.class);
+
+ private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "beam_aggregators";
- private static volatile Accumulator<NamedAggregators> instance = null;
+ private static volatile Accumulator<NamedAggregators> instance;
+ private static volatile FileSystem fileSystem;
+ private static volatile Path checkpointPath;
+ private static volatile Path tempCheckpointPath;
+ private static volatile Path backupCheckpointPath;
- static Accumulator<NamedAggregators> getInstance(JavaSparkContext jsc) {
+ @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+ static Accumulator<NamedAggregators> getInstance(
+ JavaSparkContext jsc,
+ Optional<CheckpointDir> checkpointDir) {
if (instance == null) {
synchronized (AccumulatorSingleton.class) {
if (instance == null) {
- //TODO: currently when recovering from checkpoint, Spark does not recover the
- // last known Accumulator value. The SparkRunner should be able to persist and recover
- // the NamedAggregators in order to recover Aggregators as well.
instance = jsc.sc().accumulator(new NamedAggregators(), new AggAccumParam());
+ if (checkpointDir.isPresent()) {
+ recoverValueFromCheckpoint(jsc, checkpointDir.get());
+ }
}
}
}
return instance;
}
+ private static void recoverValueFromCheckpoint(
+ JavaSparkContext jsc,
+ CheckpointDir checkpointDir) {
+ FSDataInputStream is = null;
+ try {
+ Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir();
+ checkpointPath = new Path(beamCheckpointPath, ACCUMULATOR_CHECKPOINT_FILENAME);
+ tempCheckpointPath = checkpointPath.suffix(".tmp");
+ backupCheckpointPath = checkpointPath.suffix(".bak");
+ fileSystem = checkpointPath.getFileSystem(jsc.hadoopConfiguration());
+ if (fileSystem.exists(checkpointPath)) {
+ is = fileSystem.open(checkpointPath);
+ } else if (fileSystem.exists(backupCheckpointPath)) {
+ is = fileSystem.open(backupCheckpointPath);
+ }
+ if (is != null) {
+ ObjectInputStream objectInputStream = new ObjectInputStream(is);
+ NamedAggregators recoveredValue =
+ (NamedAggregators) objectInputStream.readObject();
+ objectInputStream.close();
+ LOG.info("Recovered accumulators from checkpoint: " + recoveredValue);
+ instance.setValue(recoveredValue);
+ } else {
+ LOG.info("No accumulator checkpoint found.");
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failure while reading accumulator checkpoint.", e);
+ }
+ }
+
+ private static void checkpoint() throws IOException {
+ if (checkpointPath != null) {
+ if (fileSystem.exists(checkpointPath)) {
+ if (fileSystem.exists(backupCheckpointPath)) {
+ fileSystem.delete(backupCheckpointPath, false);
+ }
+ fileSystem.rename(checkpointPath, backupCheckpointPath);
+ }
+ FSDataOutputStream os = fileSystem.create(tempCheckpointPath, true);
+ ObjectOutputStream oos = new ObjectOutputStream(os);
+ oos.writeObject(instance.value());
+ oos.close();
+ fileSystem.rename(tempCheckpointPath, checkpointPath);
+ }
+ }
+
@VisibleForTesting
static void clear() {
synchronized (AccumulatorSingleton.class) {
instance = null;
}
}
+
+ /**
+ * Spark Listener which checkpoints {@link NamedAggregators} values for fault-tolerance.
+ */
+ public static class AccumulatorCheckpointingSparkListener extends JavaStreamingListener {
+ @Override
+ public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
+ try {
+ checkpoint();
+ } catch (IOException e) {
+ LOG.error("Failed to checkpoint accumulator singleton.", e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
index fa5c8d1..245c69e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
@@ -18,12 +18,14 @@
package org.apache.beam.runners.spark.aggregators;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import java.util.Collection;
import java.util.Map;
import org.apache.beam.runners.core.AggregatorFactory;
import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
+import org.apache.beam.runners.spark.translation.streaming.CheckpointDir;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
@@ -66,10 +68,24 @@ public class SparkAggregators {
*
* @param jsc a Spark context to be used in order to retrieve the name
* {@link NamedAggregators} instance
- * @return a {@link NamedAggregators} instance
*/
public static Accumulator<NamedAggregators> getNamedAggregators(JavaSparkContext jsc) {
- return AccumulatorSingleton.getInstance(jsc);
+ return getOrCreateNamedAggregators(jsc, Optional.<CheckpointDir>absent());
+ }
+
+ /**
+ * Retrieves or creates the {@link NamedAggregators} instance using the provided Spark context.
+ *
+ * @param jsc a Spark context to be used in order to retrieve the name
+ * {@link NamedAggregators} instance
+ * @param checkpointDir checkpoint dir (optional, for streaming pipelines)
+ * @return a {@link NamedAggregators} instance
+ */
+ @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+ public static Accumulator<NamedAggregators> getOrCreateNamedAggregators(
+ JavaSparkContext jsc,
+ Optional<CheckpointDir> checkpointDir) {
+ return AccumulatorSingleton.getInstance(jsc, checkpointDir);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java
new file mode 100644
index 0000000..5b192bd
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/CheckpointDir.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation.streaming;
+
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Spark checkpoint dir tree.
+ *
+ * {@link SparkPipelineOptions} checkpointDir is used as a root directory under which one directory
+ * is created for Spark's checkpoint and another for Beam's Spark runner's fault tolerance needs.
+ * Spark's checkpoint relies on Hadoop's {@link org.apache.hadoop.fs.FileSystem} and is used for
+ * Beam as well rather than {@link org.apache.beam.sdk.io.FileSystem} to be consistent with Spark.
+ */
+public class CheckpointDir {
+ private static final Logger LOG = LoggerFactory.getLogger(CheckpointDir.class);
+
+ private static final String SPARK_CHECKPOINT_DIR = "spark-checkpoint";
+ private static final String BEAM_CHECKPOINT_DIR = "beam-checkpoint";
+ private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)";
+
+ private final Path rootCheckpointDir;
+ private final Path sparkCheckpointDir;
+ private final Path beamCheckpointDir;
+
+ public CheckpointDir(String rootCheckpointDir) {
+ if (!rootCheckpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) {
+ LOG.warn("The specified checkpoint dir {} does not match a reliable filesystem so in case "
+ + "of failures this job may not recover properly or even at all.", rootCheckpointDir);
+ }
+ LOG.info("Checkpoint dir set to: {}", rootCheckpointDir);
+
+ this.rootCheckpointDir = new Path(rootCheckpointDir);
+ this.sparkCheckpointDir = new Path(rootCheckpointDir, SPARK_CHECKPOINT_DIR);
+ this.beamCheckpointDir = new Path(rootCheckpointDir, BEAM_CHECKPOINT_DIR);
+ }
+
+ public Path getRootCheckpointDir() {
+ return rootCheckpointDir;
+ }
+
+ public Path getSparkCheckpointDir() {
+ return sparkCheckpointDir;
+ }
+
+ public Path getBeamCheckpointDir() {
+ return beamCheckpointDir;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index d069a11..6d254e1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.translation.streaming;
import static com.google.common.base.Preconditions.checkArgument;
+import java.io.IOException;
import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
@@ -28,6 +29,8 @@ import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.sdk.Pipeline;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -45,14 +48,18 @@ import org.slf4j.LoggerFactory;
public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory {
private static final Logger LOG =
LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class);
- private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)";
private final Pipeline pipeline;
private final SparkPipelineOptions options;
+ private final CheckpointDir checkpointDir;
- public SparkRunnerStreamingContextFactory(Pipeline pipeline, SparkPipelineOptions options) {
+ public SparkRunnerStreamingContextFactory(
+ Pipeline pipeline,
+ SparkPipelineOptions options,
+ CheckpointDir checkpointDir) {
this.pipeline = pipeline;
this.options = options;
+ this.checkpointDir = checkpointDir;
}
private EvaluationContext ctxt;
@@ -73,18 +80,12 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
+
ctxt = new EvaluationContext(jsc, pipeline, jssc);
pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
ctxt.computeOutputs();
- // set checkpoint dir.
- String checkpointDir = options.getCheckpointDir();
- if (!checkpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) {
- LOG.warn("The specified checkpoint dir {} does not match a reliable filesystem so in case "
- + "of failures this job may not recover properly or even at all.", checkpointDir);
- }
- LOG.info("Checkpoint dir set to: {}", checkpointDir);
- jssc.checkpoint(checkpointDir);
+ checkpoint(jssc);
// register listeners.
for (JavaStreamingListener listener: options.as(SparkContextOptions.class).getListeners()) {
@@ -95,7 +96,26 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
return jssc;
}
- public EvaluationContext getCtxt() {
- return ctxt;
+ private void checkpoint(JavaStreamingContext jssc) {
+ Path rootCheckpointPath = checkpointDir.getRootCheckpointDir();
+ Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir();
+ Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir();
+
+ try {
+ FileSystem fileSystem = rootCheckpointPath.getFileSystem(jssc.sc().hadoopConfiguration());
+ if (!fileSystem.exists(rootCheckpointPath)) {
+ fileSystem.mkdirs(rootCheckpointPath);
+ }
+ if (!fileSystem.exists(sparkCheckpointPath)) {
+ fileSystem.mkdirs(sparkCheckpointPath);
+ }
+ if (!fileSystem.exists(beamCheckpointPath)) {
+ fileSystem.mkdirs(beamCheckpointPath);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create checkpoint dir", e);
+ }
+
+ jssc.checkpoint(sparkCheckpointPath.toString());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 7346bd9..8280672 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -81,6 +81,7 @@ public class ResumeFromCheckpointStreamingTest {
);
private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"};
private static final long EXPECTED_AGG_FIRST = 4L;
+ private static final long EXPECTED_AGG_SECOND = 8L;
@Rule
public TemporaryFolder checkpointParentDir = new TemporaryFolder();
@@ -141,8 +142,8 @@ public class ResumeFromCheckpointStreamingTest {
res = runAgain(options);
long processedMessages2 = res.getAggregatorValue("processedMessages", Long.class);
assertThat(String.format("Expected %d processed messages count but "
- + "found %d", EXPECTED_AGG_FIRST, processedMessages2), processedMessages2,
- equalTo(EXPECTED_AGG_FIRST));
+ + "found %d", EXPECTED_AGG_SECOND, processedMessages2), processedMessages2,
+ equalTo(EXPECTED_AGG_SECOND));
}
private SparkPipelineResult runAgain(SparkPipelineOptions options) {
[2/2] beam git commit: This closes #1815
Posted by am...@apache.org.
This closes #1815
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/847e4e9f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/847e4e9f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/847e4e9f
Branch: refs/heads/master
Commit: 847e4e9f0b84efa4726692cc8b7c9a0610703888
Parents: 343176c 62f9e7b
Author: Sela <an...@paypal.com>
Authored: Mon Jan 30 22:53:56 2017 +0200
Committer: Sela <an...@paypal.com>
Committed: Mon Jan 30 22:53:56 2017 +0200
----------------------------------------------------------------------
.../apache/beam/runners/spark/SparkRunner.java | 21 ++++-
.../spark/aggregators/AccumulatorSingleton.java | 96 ++++++++++++++++++--
.../spark/aggregators/SparkAggregators.java | 20 +++-
.../translation/streaming/CheckpointDir.java | 69 ++++++++++++++
.../SparkRunnerStreamingContextFactory.java | 44 ++++++---
.../ResumeFromCheckpointStreamingTest.java | 5 +-
6 files changed, 230 insertions(+), 25 deletions(-)
----------------------------------------------------------------------