You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/10 21:58:43 UTC

[18/50] [abbrv] incubator-beam git commit: Update to dataflow 0.4.150727.

Update to dataflow 0.4.150727.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/89945bf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/89945bf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/89945bf6

Branch: refs/heads/master
Commit: 89945bf676affe2cd52fed91551cb1037fc2faae
Parents: b83d666
Author: Tom White <to...@cloudera.com>
Authored: Wed Aug 5 18:10:59 2015 +0100
Committer: Tom White <to...@cloudera.com>
Committed: Thu Mar 10 11:15:15 2016 +0000

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  2 +-
 .../dataflow/spark/SparkProcessContext.java     | 53 ++++++--------------
 .../dataflow/spark/TransformTranslator.java     |  8 +--
 .../com/cloudera/dataflow/spark/TfIdfTest.java  |  2 +-
 .../dataflow/spark/TransformTranslatorTest.java |  5 ++
 5 files changed, 24 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index de9efb9..74b0fed 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -23,7 +23,7 @@ License.
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <java.version>1.7</java.version>
         <spark.version>1.3.1</spark.version>
-        <google-cloud-dataflow-version>0.4.150710</google-cloud-dataflow-version>
+        <google-cloud-dataflow-version>0.4.150727</google-cloud-dataflow-version>
     </properties>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
index bda838c..259f90c 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
@@ -19,7 +19,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -29,10 +28,11 @@ import com.google.cloud.dataflow.sdk.transforms.Combine;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
-import com.google.cloud.dataflow.sdk.util.TimerManager;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.TimerInternals;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.values.CodedTupleTag;
+import com.google.cloud.dataflow.sdk.util.state.StateInternals;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
 import com.google.cloud.dataflow.sdk.values.TupleTag;
 import com.google.common.collect.AbstractIterator;
@@ -126,6 +126,11 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
   }
 
   @Override
+  public PaneInfo pane() {
+    return PaneInfo.DEFAULT;
+  }
+
+  @Override
   public WindowingInternals<I, O> windowingInternals() {
     return new WindowingInternals<I, O>() {
 
@@ -136,53 +141,25 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
 
       @Override
       public void outputWindowedValue(O output, Instant timestamp, Collection<?
-          extends BoundedWindow> windows) {
+          extends BoundedWindow> windows, PaneInfo paneInfo) {
         output(output);
       }
 
       @Override
-      public KeyedState keyedState() {
+      public StateInternals stateInternals() {
         throw new UnsupportedOperationException(
-            "WindowingInternals#keyedState() is not yet supported.");
-
+            "WindowingInternals#stateInternals() is not yet supported.");
       }
 
       @Override
-      public <T> void writeToTagList(CodedTupleTag<T> tag, T value) throws IOException {
+      public TimerInternals timerInternals() {
         throw new UnsupportedOperationException(
-            "WindowingInternals#writeToTagList() is not yet supported.");
+            "WindowingInternals#timerInternals() is not yet supported.");
       }
 
       @Override
-      public <T> void writeToTagList(CodedTupleTag<T> tag, T value, Instant timestamp)
-          throws IOException {
-        throw new UnsupportedOperationException(
-            "WindowingInternals#writeToTagList() is not yet supported.");
-      }
-
-      @Override
-      public <T> void deleteTagList(CodedTupleTag<T> tag) {
-        throw new UnsupportedOperationException(
-            "WindowingInternals#deleteTagList() is not yet supported.");
-      }
-
-      @Override
-      public <T> Iterable<T> readTagList(CodedTupleTag<T> tag) throws IOException {
-        throw new UnsupportedOperationException(
-            "WindowingInternals#readTagList() is not yet supported.");
-      }
-
-      @Override
-      public <T> Map<CodedTupleTag<T>, Iterable<T>> readTagList(List<CodedTupleTag<T>> tags)
-          throws IOException {
-        throw new UnsupportedOperationException(
-            "WindowingInternals#readTagList() is not yet supported.");
-      }
-
-      @Override
-      public TimerManager getTimerManager() {
-        throw new UnsupportedOperationException(
-            "WindowingInternals#getTimerManager() is not yet supported.");
+      public PaneInfo pane() {
+        return PaneInfo.DEFAULT;
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
index 2c61a42..ee300fd 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
@@ -35,7 +35,6 @@ import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.KV;
@@ -511,11 +510,8 @@ public final class TransformTranslator {
     return new TransformEvaluator<Window.Bound<T>>() {
       @Override
       public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
-        if (transform.getWindowingStrategy().getWindowFn() instanceof GlobalWindows) {
-          context.setOutputRDD(transform, context.getInputRDD(transform));
-        } else {
-          throw new UnsupportedOperationException("Non-global windowing not supported");
-        }
+        // TODO: detect and support non-global windows
+        context.setOutputRDD(transform, context.getInputRDD(transform));
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java
index 680d8b7..35ab26e 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java
@@ -15,7 +15,7 @@
 
 package com.cloudera.dataflow.spark;
 
-import com.google.cloud.dataflow.examples.TfIdf;
+import com.google.cloud.dataflow.examples.complete.TfIdf;
 import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java
index 0251808..540bdd9 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java
@@ -23,6 +23,7 @@ import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
 import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.common.base.Charsets;
+import java.util.Collections;
 import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.junit.Before;
@@ -76,6 +77,10 @@ public class TransformTranslatorTest {
     List<String> sparkOutput =
         Files.readAllLines(Paths.get(sparkOut + "-00000-of-00001"), Charsets.UTF_8);
 
+    // sort output to get a stable result (PCollections are not ordered)
+    Collections.sort(directOutput);
+    Collections.sort(sparkOutput);
+
     Assert.assertArrayEquals(directOutput.toArray(), sparkOutput.toArray());
   }