You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/11/07 19:59:27 UTC
[37/50] incubator-beam git commit: [BEAM-889] Let Spark handle the
user-provided checkpointDir, but warn if not a reliable fs.
[BEAM-889] Let Spark handle the user-provided checkpointDir, but warn if not a reliable fs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90a75d1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90a75d1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90a75d1f
Branch: refs/heads/apex-runner
Commit: 90a75d1fb0706ec4cc25a9eeeca8ade1b3b7de28
Parents: 46fbfe0
Author: Sela <an...@paypal.com>
Authored: Thu Nov 3 18:22:20 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Fri Nov 4 23:59:40 2016 +0200
----------------------------------------------------------------------
.../runners/spark/SparkPipelineOptions.java | 3 +--
.../SparkRunnerStreamingContextFactory.java | 23 +++++---------------
.../streaming/EmptyStreamAssertionTest.java | 3 +--
.../streaming/FlattenStreamingTest.java | 6 ++---
.../streaming/KafkaStreamingTest.java | 6 ++---
.../ResumeFromCheckpointStreamingTest.java | 3 +--
.../streaming/SimpleStreamingWordCountTest.java | 3 +--
.../utils/TestOptionsForStreaming.java | 12 +++++-----
8 files changed, 19 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 08e14fe..4eada35 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -77,8 +77,7 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
class TmpCheckpointDirFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
- SparkPipelineOptions sparkPipelineOptions = options.as(SparkPipelineOptions.class);
- return "file:///tmp/" + sparkPipelineOptions.getJobName();
+ return "/tmp/" + options.as(SparkPipelineOptions.class).getJobName();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index 2378788..a670f61 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -20,11 +20,6 @@ package org.apache.beam.runners.spark.translation.streaming;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Arrays;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
@@ -48,7 +43,7 @@ import org.slf4j.LoggerFactory;
public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory {
private static final Logger LOG =
LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class);
- private static final Iterable<String> KNOWN_RELIABLE_FS = Arrays.asList("hdfs", "s3", "gs");
+ private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)";
private final Pipeline pipeline;
private final SparkPipelineOptions options;
@@ -83,19 +78,11 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
// set checkpoint dir.
String checkpointDir = options.getCheckpointDir();
- LOG.info("Checkpoint dir set to: {}", checkpointDir);
- try {
- // validate checkpoint dir and warn if not of a known durable filesystem.
- URL checkpointDirUrl = new URL(checkpointDir);
- if (!Iterables.any(KNOWN_RELIABLE_FS, Predicates.equalTo(checkpointDirUrl.getProtocol()))) {
- LOG.warn("Checkpoint dir URL {} does not match a reliable filesystem, in case of failures "
- + "this job may not recover properly or even at all.", checkpointDirUrl);
- }
- } catch (MalformedURLException e) {
- throw new RuntimeException("Failed to form checkpoint dir URL. CheckpointDir should be in "
- + "the form of hdfs:///path/to/dir or other reliable fs protocol, "
- + "or file:///path/to/dir for local mode.", e);
+ if (!checkpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) {
+ LOG.warn("The specified checkpoint dir {} does not match a reliable filesystem so in case "
+ + "of failures this job may not recover properly or even at all.", checkpointDir);
}
+ LOG.info("Checkpoint dir set to: {}", checkpointDir);
jssc.checkpoint(checkpointDir);
// register listeners.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
index 4f2a7c6..3e95b4d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
@@ -55,8 +55,7 @@ public class EmptyStreamAssertionTest implements Serializable {
@Test
public void testAssertion() throws Exception {
- SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
- checkpointParentDir.newFolder(getClass().getSimpleName()));
+ SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
Duration windowDuration = new Duration(options.getBatchIntervalMillis());
Pipeline pipeline = Pipeline.create(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index e6872f1..319b5e9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -61,8 +61,7 @@ public class FlattenStreamingTest {
@Test
public void testFlattenUnbounded() throws Exception {
- SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
- checkpointParentDir.newFolder(getClass().getSimpleName()));
+ SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
Pipeline p = Pipeline.create(options);
PCollection<String> w1 =
@@ -81,8 +80,7 @@ public class FlattenStreamingTest {
@Test
public void testFlattenBoundedUnbounded() throws Exception {
- SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
- checkpointParentDir.newFolder(getClass().getSimpleName()));
+ SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
Pipeline p = Pipeline.create(options);
PCollection<String> w1 =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index fe2d04e..f01059f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -74,8 +74,7 @@ public class KafkaStreamingTest {
@Test
public void testEarliest2Topics() throws Exception {
- SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
- checkpointParentDir.newFolder(getClass().getSimpleName()));
+ SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
// It seems that the consumer's first "position" lookup (in unit test) takes +200 msec,
// so to be on the safe side we'll set to 750 msec.
options.setMinReadTimeMillis(750L);
@@ -122,8 +121,7 @@ public class KafkaStreamingTest {
@Test
public void testLatest() throws Exception {
- SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
- checkpointParentDir.newFolder(getClass().getSimpleName()));
+ SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
//--- setup
final String topic = "topic";
// messages.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index ca0b668..34ffbe2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -108,8 +108,7 @@ public class ResumeFromCheckpointStreamingTest {
@Test
public void testRun() throws Exception {
- SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
- checkpointParentDir.newFolder(getClass().getSimpleName()));
+ SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
// It seems that the consumer's first "position" lookup (in unit test) takes +200 msec,
// so to be on the safe side we'll set to 750 msec.
options.setMinReadTimeMillis(750L);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 4bc9a3d..edba507 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -62,8 +62,7 @@ public class SimpleStreamingWordCountTest implements Serializable {
@Test
public void testFixedWindows() throws Exception {
- SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
- checkpointParentDir.newFolder(getClass().getSimpleName()));
+ SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
// override defaults
options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
index d695df0..2861d9f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
@@ -19,12 +19,13 @@
package org.apache.beam.runners.spark.translation.streaming.utils;
-import java.io.File;
-import java.net.MalformedURLException;
+import java.io.IOException;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+
/**
@@ -41,11 +42,10 @@ public class TestOptionsForStreaming extends ExternalResource {
options.setTimeout(1000L);
}
- public SparkPipelineOptions withTmpCheckpointDir(File checkpointDir)
- throws MalformedURLException {
+ public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent)
+ throws IOException {
// tests use JUnit's TemporaryFolder path in the form of: /.../junit/...
- // so need to add the missing protocol.
- options.setCheckpointDir(checkpointDir.toURI().toURL().toString());
+ options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString());
return options;
}