You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/10 21:58:49 UTC

[24/50] [abbrv] incubator-beam git commit: Update to dataflow 0.4.150710.

Update to dataflow 0.4.150710.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c01421ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c01421ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c01421ce

Branch: refs/heads/master
Commit: c01421ce3375712bae0c0b88b5f2141c67284ade
Parents: 3cae69b
Author: Tom White <to...@cloudera.com>
Authored: Mon Jul 13 21:08:57 2015 +0100
Committer: Tom White <to...@cloudera.com>
Committed: Thu Mar 10 11:15:15 2016 +0000

----------------------------------------------------------------------
 runners/spark/pom.xml                                        | 2 +-
 .../com/cloudera/dataflow/spark/SparkProcessContext.java     | 8 ++++----
 .../com/cloudera/dataflow/spark/TransformTranslator.java     | 8 ++++----
 .../test/java/com/cloudera/dataflow/spark/NumShardsTest.java | 4 +++-
 .../java/com/cloudera/dataflow/spark/SerializationTest.java  | 4 ++--
 5 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 0382108..3bce8c0 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -23,7 +23,7 @@ License.
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <java.version>1.7</java.version>
         <spark.version>1.3.1</spark.version>
-        <google-cloud-dataflow-version>0.4.150602</google-cloud-dataflow-version>
+        <google-cloud-dataflow-version>0.4.150710</google-cloud-dataflow-version>
     </properties>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
index ee51c35..d0e9d6a 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
@@ -148,14 +148,14 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
       }
 
       @Override
-      public <T> void store(CodedTupleTag<T> tag, T value, Instant timestamp)
-          throws IOException {
+      public <T> void writeToTagList(CodedTupleTag<T> tag, T value) throws IOException {
         throw new UnsupportedOperationException(
-            "WindowingInternals#store() is not yet supported.");
+            "WindowingInternals#writeToTagList() is not yet supported.");
       }
 
       @Override
-      public <T> void writeToTagList(CodedTupleTag<T> tag, T value) throws IOException {
+      public <T> void writeToTagList(CodedTupleTag<T> tag, T value, Instant timestamp)
+          throws IOException {
         throw new UnsupportedOperationException(
             "WindowingInternals#writeToTagList() is not yet supported.");
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
index e1af3cf..f137218 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
@@ -520,10 +520,10 @@ public final class TransformTranslator {
     };
   }
 
-  private static <T> TransformEvaluator<Create<T>> create() {
-    return new TransformEvaluator<Create<T>>() {
+  private static <T> TransformEvaluator<Create.Values<T>> create() {
+    return new TransformEvaluator<Create.Values<T>>() {
       @Override
-      public void evaluate(Create<T> transform, EvaluationContext context) {
+      public void evaluate(Create.Values<T> transform, EvaluationContext context) {
         Iterable<T> elems = transform.getElements();
         // Use a coder to convert the objects in the PCollection to byte arrays, so they
         // can be transferred over the network.
@@ -624,7 +624,7 @@ public final class TransformTranslator {
     EVALUATORS.put(Combine.Globally.class, combineGlobally());
     EVALUATORS.put(Combine.PerKey.class, combinePerKey());
     EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
-    EVALUATORS.put(Create.class, create());
+    EVALUATORS.put(Create.Values.class, create());
     EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
     EVALUATORS.put(View.AsIterable.class, viewAsIter());
     EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java
index 8985e66..9572b0f 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java
@@ -20,6 +20,7 @@ import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
 import com.google.cloud.dataflow.sdk.io.TextIO;
 import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.common.base.Charsets;
 import com.google.common.collect.Sets;
@@ -62,7 +63,8 @@ public class NumShardsTest {
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
     PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
-    PCollection<String> output = inputWords.apply(new WordCount.CountWords());
+    PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+        .apply(ParDo.of(new WordCount.FormatAsTextFn()));
     output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
     EvaluationResult res = SparkPipelineRunner.create().run(p);
     res.close();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
index 40591e5..bd1a4e8 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
@@ -112,8 +112,8 @@ public class SerializationTest {
     SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
-    PCollection<StringHolder> inputWords = p.apply(Create.of(WORDS)).setCoder
-        (StringHolderUtf8Coder.of());
+    PCollection<StringHolder> inputWords =
+        p.apply(Create.of(WORDS).withCoder(StringHolderUtf8Coder.of()));
     PCollection<StringHolder> output = inputWords.apply(new CountWords());
 
     DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);