You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/05/25 21:15:21 UTC
[1/2] incubator-beam git commit: Use Create.of withCoder instead of
setCoder on the created PCollection
Repository: incubator-beam
Updated Branches:
refs/heads/master a3fc40aa3 -> 9f97ea0a7
Use Create.of withCoder instead of setCoder on the created PCollection
One more left..
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69da98a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69da98a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69da98a9
Branch: refs/heads/master
Commit: 69da98a93956add8e86d094cb866bf86c5626089
Parents: 78c8c52
Author: Ilya Ganelin <il...@capitalone.com>
Authored: Tue May 24 13:37:04 2016 -0700
Committer: Sela <an...@paypal.com>
Committed: Wed May 25 23:59:15 2016 +0300
----------------------------------------------------------------------
.../test/java/org/apache/beam/runners/spark/DeDupTest.java | 2 +-
.../java/org/apache/beam/runners/spark/EmptyInputTest.java | 2 +-
.../org/apache/beam/runners/spark/SimpleWordCountTest.java | 8 ++++----
.../java/org/apache/beam/runners/spark/io/NumShardsTest.java | 2 +-
.../beam/runners/spark/translation/CombineGloballyTest.java | 2 +-
.../beam/runners/spark/translation/CombinePerKeyTest.java | 2 +-
.../runners/spark/translation/WindowedWordCountTest.java | 8 ++++----
7 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
index 0b48bed..285a2d6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
@@ -51,7 +51,7 @@ public class DeDupTest {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkPipelineRunner.class);
Pipeline p = Pipeline.create(options);
- PCollection<String> input = p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of());
+ PCollection<String> input = p.apply(Create.of(LINES).withCoder(StringUtf8Coder.of()));
PCollection<String> output = input.apply(RemoveDuplicates.<String>create());
PAssert.that(output).containsInAnyOrder(EXPECTED_SET);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
index 7b25e34..f227e94 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
@@ -46,7 +46,7 @@ public class EmptyInputTest {
options.setRunner(SparkPipelineRunner.class);
Pipeline p = Pipeline.create(options);
List<String> empty = Collections.emptyList();
- PCollection<String> inputWords = p.apply(Create.of(empty)).setCoder(StringUtf8Coder.of());
+ PCollection<String> inputWords = p.apply(Create.of(empty).withCoder(StringUtf8Coder.of()));
PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords()));
EvaluationResult res = SparkPipelineRunner.create().run(p);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index eee120e..61ad24f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -66,8 +66,8 @@ public class SimpleWordCountTest {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkPipelineRunner.class);
Pipeline p = Pipeline.create(options);
- PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder
- .of());
+ PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
+ .of()));
PCollection<String> output = inputWords.apply(new CountWords());
PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
@@ -84,8 +84,8 @@ public class SimpleWordCountTest {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkPipelineRunner.class);
Pipeline p = Pipeline.create(options);
- PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder
- .of());
+ PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
+ .of()));
PCollection<String> output = inputWords.apply(new CountWords());
File outputFile = testFolder.newFile();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 8ce35c4..23d4592 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -75,7 +75,7 @@ public class NumShardsTest {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkPipelineRunner.class);
Pipeline p = Pipeline.create(options);
- PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
+ PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<String> output = inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
index ac64540..9a3edd3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
@@ -52,7 +52,7 @@ public class CombineGloballyTest {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkPipelineRunner.class);
Pipeline p = Pipeline.create(options);
- PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
+ PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));
EvaluationResult res = SparkPipelineRunner.create().run(p);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 4e6c888..face526 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -52,7 +52,7 @@ public class CombinePerKeyTest {
@Test
public void testRun() {
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
- PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
+ PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>());
EvaluationResult res = SparkPipelineRunner.create().run(p);
Map<String, Long> actualCnts = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index 8062658..c6911e1 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -75,8 +75,8 @@ public class WindowedWordCountTest {
@Test
public void testFixed2() throws Exception {
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
- PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS))
- .setCoder(StringUtf8Coder.of());
+ PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
+ .withCoder(StringUtf8Coder.of()));
PCollection<String> windowedWords = inputWords
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))));
@@ -95,8 +95,8 @@ public class WindowedWordCountTest {
@Test
public void testSliding() throws Exception {
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
- PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS))
- .setCoder(StringUtf8Coder.of());
+ PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
+ .withCoder(StringUtf8Coder.of()));
PCollection<String> windowedWords = inputWords
.apply(Window.<String>into(SlidingWindows.of(Duration.standardMinutes(2))
.every(Duration.standardMinutes(1))));
[2/2] incubator-beam git commit: [BEAM-305] Replace usages of
PCollection.setCoder with Create.of().withCoder in Spark Runner
Posted by am...@apache.org.
[BEAM-305] Replace usages of PCollection.setCoder with Create.of().withCoder in Spark Runner
This closes #386
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f97ea0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f97ea0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f97ea0a
Branch: refs/heads/master
Commit: 9f97ea0a7ddec61a3ff23841dbf74fa0a260dcae
Parents: a3fc40a 69da98a
Author: Sela <an...@paypal.com>
Authored: Thu May 26 00:03:38 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Thu May 26 00:03:38 2016 +0300
----------------------------------------------------------------------
.../test/java/org/apache/beam/runners/spark/DeDupTest.java | 2 +-
.../java/org/apache/beam/runners/spark/EmptyInputTest.java | 2 +-
.../org/apache/beam/runners/spark/SimpleWordCountTest.java | 8 ++++----
.../java/org/apache/beam/runners/spark/io/NumShardsTest.java | 2 +-
.../beam/runners/spark/translation/CombineGloballyTest.java | 2 +-
.../beam/runners/spark/translation/CombinePerKeyTest.java | 2 +-
.../runners/spark/translation/WindowedWordCountTest.java | 8 ++++----
7 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------