You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/15 00:09:05 UTC

[1/2] incubator-beam git commit: Remove Spark(Streaming)PipelineOptionsFactory

Repository: incubator-beam
Updated Branches:
  refs/heads/master 96765f19b -> 6511ba28e


Remove Spark(Streaming)PipelineOptionsFactory

Pipeline authors should generally not use any runner-specific classes,
but instead should select the runner and appropriate configurations
through the PipelineOptionsFactory.fromArgs() method. The runner can
then obtain the appropriately typed PipelineOptions class as required
and do any neccessary validation. Failing this, they should use the
provided PipelineOptions#as() method to acquire the appropriately typed
options.

If required, users should construct SparkPipelineOptions via
PipelineOptionsFactory.as(SparkPipelineOptions.class).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/11ba2b9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/11ba2b9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/11ba2b9c

Branch: refs/heads/master
Commit: 11ba2b9c865e4bb78790056e0cd9fa94abe000f5
Parents: 96765f1
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 11 13:58:59 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Apr 14 10:26:06 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/SparkPipelineRunner.java |  6 ++-
 .../SparkPipelineOptionsFactory.java            | 31 ---------------
 .../SparkStreamingPipelineOptionsFactory.java   | 31 ---------------
 .../apache/beam/runners/spark/DeDupTest.java    | 16 ++++----
 .../beam/runners/spark/EmptyInputTest.java      | 15 +++++---
 .../beam/runners/spark/SimpleWordCountTest.java | 18 +++++----
 .../apache/beam/runners/spark/TfIdfTest.java    | 11 ++++--
 .../runners/spark/coders/WritableCoderTest.java |  1 +
 .../beam/runners/spark/io/AvroPipelineTest.java | 22 ++++++-----
 .../beam/runners/spark/io/NumShardsTest.java    | 26 +++++++------
 .../io/hadoop/HadoopFileFormatPipelineTest.java |  7 ++--
 .../spark/io/hadoop/ShardNameBuilderTest.java   |  1 +
 .../spark/translation/CombineGloballyTest.java  | 14 ++++---
 .../spark/translation/CombinePerKeyTest.java    | 12 +++---
 .../spark/translation/DoFnOutputTest.java       |  8 ++--
 .../translation/MultiOutputWordCountTest.java   | 10 +++--
 .../spark/translation/SerializationTest.java    | 27 +++++++------
 .../spark/translation/SideEffectsTest.java      | 10 +++--
 .../translation/SparkPipelineOptionsTest.java   | 40 ++++++++++++++++++++
 .../TestSparkPipelineOptionsFactory.java        | 38 -------------------
 .../translation/TransformTranslatorTest.java    | 20 +++++-----
 .../translation/WindowedWordCountTest.java      | 10 +++--
 .../streaming/FlattenStreamingTest.java         | 13 ++++---
 .../streaming/KafkaStreamingTest.java           | 24 +++++++-----
 .../streaming/SimpleStreamingWordCountTest.java | 20 ++++++----
 .../streaming/utils/EmbeddedKafkaCluster.java   | 11 +++---
 .../streaming/utils/PAssertStreaming.java       |  1 +
 27 files changed, 217 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index 99da74f..2b33e7a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.spark;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.runners.spark.translation.SparkPipelineEvaluator;
-import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.SparkProcessContext;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
@@ -30,6 +29,7 @@ import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTra
 import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformTreeNode;
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.Duration;
@@ -85,7 +86,8 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
    * @return A pipeline runner with default options.
    */
   public static SparkPipelineRunner create() {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+    options.setRunner(SparkPipelineRunner.class);
     return new SparkPipelineRunner(options);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsFactory.java
deleted file mode 100644
index fe89ee3..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-public final class SparkPipelineOptionsFactory {
-  private SparkPipelineOptionsFactory() {
-  }
-
-  public static SparkPipelineOptions create() {
-    return PipelineOptionsFactory.as(SparkPipelineOptions.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java
deleted file mode 100644
index 4abf816..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.translation.streaming;
-
-import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-public final class SparkStreamingPipelineOptionsFactory {
-
-  private SparkStreamingPipelineOptionsFactory() {
-  }
-
-  public static SparkStreamingPipelineOptions create() {
-    return PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
index d13404c..0b48bed 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
@@ -18,20 +18,22 @@
 
 package org.apache.beam.runners.spark;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.ImmutableSet;
+
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
 /**
  * A test based on {@code DeDupExample} from the SDK.
  */
@@ -46,7 +48,7 @@ public class DeDupTest {
 
   @Test
   public void testRun() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
     PCollection<String> input = p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
index eeca10f..f9b00cc 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
@@ -20,24 +20,27 @@ package org.apache.beam.runners.spark;
 
 import static org.junit.Assert.assertEquals;
 
-import java.util.Collections;
-import java.util.List;
-
-import com.google.common.collect.Iterables;
-import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.Iterables;
+
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.List;
+
 public class EmptyInputTest {
 
   @Test
   public void test() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+    options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
     List<String> empty = Collections.emptyList();
     PCollection<String> inputWords = p.apply(Create.of(empty)).setCoder(StringUtf8Coder.of());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index c413b3f..faa4dbf 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -18,15 +18,9 @@
 
 package org.apache.beam.runners.spark;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import com.google.common.collect.ImmutableSet;
-import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
@@ -37,8 +31,16 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.ImmutableSet;
+
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
 public class SimpleWordCountTest {
   private static final String[] WORDS_ARRAY = {
       "hi there", "hi", "hi sue bob",
@@ -49,7 +51,7 @@ public class SimpleWordCountTest {
 
   @Test
   public void testRun() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
     PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index 4e80fe9..00c4657 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -18,9 +18,6 @@
 
 package org.apache.beam.runners.spark;
 
-import java.net.URI;
-import java.util.Arrays;
-
 import org.apache.beam.examples.complete.TfIdf;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringDelegateCoder;
@@ -31,8 +28,12 @@ import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+
 import org.junit.Test;
 
+import java.net.URI;
+import java.util.Arrays;
+
 /**
  * A test based on {@code TfIdf} from the SDK.
  */
@@ -40,7 +41,9 @@ public class TfIdfTest {
 
   @Test
   public void testTfIdf() throws Exception {
-    Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
+    SparkPipelineOptions opts = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+    opts.setRunner(SparkPipelineRunner.class);
+    Pipeline pipeline = Pipeline.create(opts);
 
     pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
index 538fd97..f2bd4d3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.spark.coders;
 
 import org.apache.beam.sdk.testing.CoderProperties;
+
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index f8f9624..693e2c6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -20,13 +20,16 @@ package org.apache.beam.runners.spark.io;
 
 import static org.junit.Assert.assertEquals;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.List;
+import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.PCollection;
 
 import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileWriter;
@@ -34,17 +37,16 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.values.PCollection;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+
 public class AvroPipelineTest {
 
   private File inputFile;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 37a7d54..85eeabd 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -21,32 +21,34 @@ package org.apache.beam.runners.spark.io;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
 import org.apache.beam.examples.WordCount;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
-import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
 public class NumShardsTest {
 
   private static final String[] WORDS_ARRAY = {
@@ -67,7 +69,7 @@ public class NumShardsTest {
 
   @Test
   public void testText() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
     PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index b7df739..0c8c6fc 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -20,9 +20,6 @@ package org.apache.beam.runners.spark.io.hadoop;
 
 import static org.junit.Assert.assertEquals;
 
-import java.io.File;
-import java.io.IOException;
-
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.runners.spark.coders.WritableCoder;
@@ -31,6 +28,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -46,6 +44,9 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.IOException;
+
 public class HadoopFileFormatPipelineTest {
 
   private File inputFile;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
index eecfd58..55991a4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
@@ -23,6 +23,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutput
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber;
+
 import static org.junit.Assert.assertEquals;
 
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
index bd90aae..a644673 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
@@ -20,20 +20,23 @@ package org.apache.beam.runners.spark.translation;
 
 import static org.junit.Assert.assertEquals;
 
-import java.util.Arrays;
-import java.util.List;
-
-import com.google.common.collect.Iterables;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.Iterables;
+
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.List;
+
 public class CombineGloballyTest {
 
   private static final String[] WORDS_ARRAY = {
@@ -43,7 +46,8 @@ public class CombineGloballyTest {
 
   @Test
   public void test() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+    options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
     PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
     PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 6cf9330..4e0bc5d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -18,11 +18,6 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableList;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
@@ -37,9 +32,16 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.ImmutableList;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class CombinePerKeyTest {
 
     private static final List<String> WORDS =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
index 80b5fc5..ca97a96 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
@@ -18,23 +18,25 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import java.io.Serializable;
-
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
+
 import org.junit.Test;
 
+import java.io.Serializable;
+
 public class DoFnOutputTest implements Serializable {
   @Test
   public void test() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkPipelineRunner.class);
     Pipeline pipeline = Pipeline.create(options);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 1450a8f..6a862c9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -18,10 +18,6 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
@@ -47,9 +43,15 @@ import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Set;
+
 public class MultiOutputWordCountTest {
 
   private static final TupleTag<String> upper = new TupleTag<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index 2237ea2..75d3fb2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -18,17 +18,6 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
@@ -36,6 +25,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
@@ -46,8 +36,21 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
 import org.junit.Test;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
 public class SerializationTest {
 
   public static class StringHolder { // not serializable
@@ -117,7 +120,7 @@ public class SerializationTest {
 
   @Test
   public void testRun() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
     PCollection<StringHolder> inputWords =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index b85d935..14abbfc 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -22,20 +22,22 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.Serializable;
-import java.net.URI;
-
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringDelegateCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.Serializable;
+import java.net.URI;
+
 public class SideEffectsTest implements Serializable {
 
   static class UserException extends RuntimeException {
@@ -43,7 +45,7 @@ public class SideEffectsTest implements Serializable {
 
   @Test
   public void test() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkPipelineRunner.class);
     Pipeline pipeline = Pipeline.create(options);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
new file mode 100644
index 0000000..bf18486
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SparkPipelineOptionsTest {
+  @Test
+  public void testDefaultCreateMethod() {
+    SparkPipelineOptions actualOptions = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+    Assert.assertEquals("local[1]", actualOptions.getSparkMaster());
+  }
+
+  @Test
+  public void testSettingCustomOptions() {
+    SparkPipelineOptions actualOptions = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+    actualOptions.setSparkMaster("spark://207.184.161.138:7077");
+    Assert.assertEquals("spark://207.184.161.138:7077", actualOptions.getSparkMaster());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TestSparkPipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TestSparkPipelineOptionsFactory.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TestSparkPipelineOptionsFactory.java
deleted file mode 100644
index 9cace83..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TestSparkPipelineOptionsFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestSparkPipelineOptionsFactory {
-  @Test
-  public void testDefaultCreateMethod() {
-    SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create();
-    Assert.assertEquals("local[1]", actualOptions.getSparkMaster());
-  }
-
-  @Test
-  public void testSettingCustomOptions() {
-    SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create();
-    actualOptions.setSparkMaster("spark://207.184.161.138:7077");
-    Assert.assertEquals("spark://207.184.161.138:7077", actualOptions.getSparkMaster());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index f07fa0b..de4a5d2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -18,15 +18,6 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.List;
-
-import com.google.api.client.repackaged.com.google.common.base.Joiner;
-import com.google.common.base.Charsets;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
@@ -34,6 +25,10 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.values.PCollection;
+
+import com.google.api.client.repackaged.com.google.common.base.Joiner;
+import com.google.common.base.Charsets;
+
 import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.junit.Before;
@@ -41,6 +36,13 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
 /**
  * A test for the transforms registered in TransformTranslator.
  * Builds a regular Dataflow pipeline with each of the mapped

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index 5134b5c..0db8913 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -18,10 +18,6 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import java.util.Arrays;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SimpleWordCountTest;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
@@ -34,9 +30,15 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.ImmutableList;
+
 import org.joda.time.Duration;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.List;
+
 public class WindowedWordCountTest {
   private static final String[] WORDS_ARRAY = {
       "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index 17f7fbd..9152d72 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -17,10 +17,6 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
@@ -28,6 +24,7 @@ import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.View;
@@ -35,9 +32,14 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+
 import org.joda.time.Duration;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 /**
  * Test Flatten (union) implementation for streaming.
  */
@@ -57,7 +59,8 @@ public class FlattenStreamingTest {
 
   @Test
   public void testRun() throws Exception {
-    SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create();
+    SparkStreamingPipelineOptions options =
+        PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
     options.setAppName(this.getClass().getSimpleName());
     options.setRunner(SparkPipelineRunner.class);
     options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index a0c8a4e..e1ff227 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -17,15 +17,6 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import kafka.serializer.StringDecoder;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
@@ -35,6 +26,7 @@ import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreamin
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -43,6 +35,10 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
@@ -52,6 +48,13 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import kafka.serializer.StringDecoder;
 /**
  * Test Kafka as input.
  */
@@ -92,7 +95,8 @@ public class KafkaStreamingTest {
   @Test
   public void testRun() throws Exception {
     // test read from Kafka
-    SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create();
+    SparkStreamingPipelineOptions options =
+        PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
     options.setAppName(this.getClass().getSimpleName());
     options.setRunner(SparkPipelineRunner.class);
     options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index de554e2..ef224da 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -17,12 +17,6 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SimpleWordCountTest;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
@@ -31,14 +25,23 @@ import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.ImmutableSet;
+
 import org.joda.time.Duration;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
 public class SimpleStreamingWordCountTest {
 
   private static final String[] WORDS_ARRAY = {
@@ -51,7 +54,8 @@ public class SimpleStreamingWordCountTest {
 
   @Test
   public void testRun() throws Exception {
-    SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create();
+    SparkStreamingPipelineOptions options =
+        PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
     options.setAppName(this.getClass().getSimpleName());
     options.setRunner(SparkPipelineRunner.class);
     options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
@@ -72,4 +76,4 @@ public class SimpleStreamingWordCountTest {
 
     PAssertStreaming.assertNoFailures(res);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
index 21c8115..2ade467 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
@@ -17,6 +17,12 @@
  */
 package org.apache.beam.runners.spark.translation.streaming.utils;
 
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -31,11 +37,6 @@ import java.util.Random;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.Time;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * https://gist.github.com/fjavieralba/7930018

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11ba2b9c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 3f50798..041cc50 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.spark.translation.streaming.utils;
 
 import org.apache.beam.runners.spark.EvaluationResult;
+
 import org.junit.Assert;
 
 /**


[2/2] incubator-beam git commit: [BEAM-22] This closes #167

Posted by lc...@apache.org.
[BEAM-22] This closes #167


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6511ba28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6511ba28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6511ba28

Branch: refs/heads/master
Commit: 6511ba28e4edbe9945df998df362787e8dbeb401
Parents: 96765f1 11ba2b9
Author: Luke Cwik <lc...@google.com>
Authored: Thu Apr 14 15:08:50 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Apr 14 15:08:50 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/SparkPipelineRunner.java |  6 ++-
 .../SparkPipelineOptionsFactory.java            | 31 ---------------
 .../SparkStreamingPipelineOptionsFactory.java   | 31 ---------------
 .../apache/beam/runners/spark/DeDupTest.java    | 16 ++++----
 .../beam/runners/spark/EmptyInputTest.java      | 15 +++++---
 .../beam/runners/spark/SimpleWordCountTest.java | 18 +++++----
 .../apache/beam/runners/spark/TfIdfTest.java    | 11 ++++--
 .../runners/spark/coders/WritableCoderTest.java |  1 +
 .../beam/runners/spark/io/AvroPipelineTest.java | 22 ++++++-----
 .../beam/runners/spark/io/NumShardsTest.java    | 26 +++++++------
 .../io/hadoop/HadoopFileFormatPipelineTest.java |  7 ++--
 .../spark/io/hadoop/ShardNameBuilderTest.java   |  1 +
 .../spark/translation/CombineGloballyTest.java  | 14 ++++---
 .../spark/translation/CombinePerKeyTest.java    | 12 +++---
 .../spark/translation/DoFnOutputTest.java       |  8 ++--
 .../translation/MultiOutputWordCountTest.java   | 10 +++--
 .../spark/translation/SerializationTest.java    | 27 +++++++------
 .../spark/translation/SideEffectsTest.java      | 10 +++--
 .../translation/SparkPipelineOptionsTest.java   | 40 ++++++++++++++++++++
 .../TestSparkPipelineOptionsFactory.java        | 38 -------------------
 .../translation/TransformTranslatorTest.java    | 20 +++++-----
 .../translation/WindowedWordCountTest.java      | 10 +++--
 .../streaming/FlattenStreamingTest.java         | 13 ++++---
 .../streaming/KafkaStreamingTest.java           | 24 +++++++-----
 .../streaming/SimpleStreamingWordCountTest.java | 20 ++++++----
 .../streaming/utils/EmbeddedKafkaCluster.java   | 11 +++---
 .../streaming/utils/PAssertStreaming.java       |  1 +
 27 files changed, 217 insertions(+), 226 deletions(-)
----------------------------------------------------------------------