You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/05/25 21:15:21 UTC

[1/2] incubator-beam git commit: Use Create.of withCoder instead of setCoder on the created PCollection

Repository: incubator-beam
Updated Branches:
  refs/heads/master a3fc40aa3 -> 9f97ea0a7


Use Create.of withCoder instead of setCoder on the created PCollection

One more left..


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69da98a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69da98a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69da98a9

Branch: refs/heads/master
Commit: 69da98a93956add8e86d094cb866bf86c5626089
Parents: 78c8c52
Author: Ilya Ganelin <il...@capitalone.com>
Authored: Tue May 24 13:37:04 2016 -0700
Committer: Sela <an...@paypal.com>
Committed: Wed May 25 23:59:15 2016 +0300

----------------------------------------------------------------------
 .../test/java/org/apache/beam/runners/spark/DeDupTest.java   | 2 +-
 .../java/org/apache/beam/runners/spark/EmptyInputTest.java   | 2 +-
 .../org/apache/beam/runners/spark/SimpleWordCountTest.java   | 8 ++++----
 .../java/org/apache/beam/runners/spark/io/NumShardsTest.java | 2 +-
 .../beam/runners/spark/translation/CombineGloballyTest.java  | 2 +-
 .../beam/runners/spark/translation/CombinePerKeyTest.java    | 2 +-
 .../runners/spark/translation/WindowedWordCountTest.java     | 8 ++++----
 7 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
index 0b48bed..285a2d6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
@@ -51,7 +51,7 @@ public class DeDupTest {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
-    PCollection<String> input = p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of());
+    PCollection<String> input = p.apply(Create.of(LINES).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = input.apply(RemoveDuplicates.<String>create());
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_SET);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
index 7b25e34..f227e94 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
@@ -46,7 +46,7 @@ public class EmptyInputTest {
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
     List<String> empty = Collections.emptyList();
-    PCollection<String> inputWords = p.apply(Create.of(empty)).setCoder(StringUtf8Coder.of());
+    PCollection<String> inputWords = p.apply(Create.of(empty).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords()));
 
     EvaluationResult res = SparkPipelineRunner.create().run(p);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index eee120e..61ad24f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -66,8 +66,8 @@ public class SimpleWordCountTest {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
-    PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder
-        .of());
+    PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
+        .of()));
     PCollection<String> output = inputWords.apply(new CountWords());
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
@@ -84,8 +84,8 @@ public class SimpleWordCountTest {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
-    PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder
-        .of());
+    PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
+        .of()));
     PCollection<String> output = inputWords.apply(new CountWords());
 
     File outputFile = testFolder.newFile();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 8ce35c4..23d4592 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -75,7 +75,7 @@ public class NumShardsTest {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
-    PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
+    PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = inputWords.apply(new WordCount.CountWords())
         .apply(MapElements.via(new WordCount.FormatAsTextFn()));
     output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
index ac64540..9a3edd3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
@@ -52,7 +52,7 @@ public class CombineGloballyTest {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
-    PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
+    PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));
 
     EvaluationResult res = SparkPipelineRunner.create().run(p);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 4e6c888..face526 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -52,7 +52,7 @@ public class CombinePerKeyTest {
     @Test
     public void testRun() {
         Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
-        PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
+        PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
         PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>());
         EvaluationResult res = SparkPipelineRunner.create().run(p);
         Map<String, Long> actualCnts = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index 8062658..c6911e1 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -75,8 +75,8 @@ public class WindowedWordCountTest {
   @Test
   public void testFixed2() throws Exception {
     Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
-    PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS))
-        .setCoder(StringUtf8Coder.of());
+    PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
+        .withCoder(StringUtf8Coder.of()));
     PCollection<String> windowedWords = inputWords
         .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))));
 
@@ -95,8 +95,8 @@ public class WindowedWordCountTest {
   @Test
   public void testSliding() throws Exception {
     Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
-    PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS))
-        .setCoder(StringUtf8Coder.of());
+    PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
+        .withCoder(StringUtf8Coder.of()));
     PCollection<String> windowedWords = inputWords
         .apply(Window.<String>into(SlidingWindows.of(Duration.standardMinutes(2))
         .every(Duration.standardMinutes(1))));


[2/2] incubator-beam git commit: [BEAM-305] Replace usages of PCollection.setCoder with Create.of().withCoder in Spark Runner

Posted by am...@apache.org.
[BEAM-305] Replace usages of PCollection.setCoder with Create.of().withCoder in Spark Runner

This closes #386


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f97ea0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f97ea0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f97ea0a

Branch: refs/heads/master
Commit: 9f97ea0a7ddec61a3ff23841dbf74fa0a260dcae
Parents: a3fc40a 69da98a
Author: Sela <an...@paypal.com>
Authored: Thu May 26 00:03:38 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Thu May 26 00:03:38 2016 +0300

----------------------------------------------------------------------
 .../test/java/org/apache/beam/runners/spark/DeDupTest.java   | 2 +-
 .../java/org/apache/beam/runners/spark/EmptyInputTest.java   | 2 +-
 .../org/apache/beam/runners/spark/SimpleWordCountTest.java   | 8 ++++----
 .../java/org/apache/beam/runners/spark/io/NumShardsTest.java | 2 +-
 .../beam/runners/spark/translation/CombineGloballyTest.java  | 2 +-
 .../beam/runners/spark/translation/CombinePerKeyTest.java    | 2 +-
 .../runners/spark/translation/WindowedWordCountTest.java     | 8 ++++----
 7 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------