You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/10 21:58:43 UTC
[18/50] [abbrv] incubator-beam git commit: Update to dataflow
0.4.150727.
Update to dataflow 0.4.150727.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/89945bf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/89945bf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/89945bf6
Branch: refs/heads/master
Commit: 89945bf676affe2cd52fed91551cb1037fc2faae
Parents: b83d666
Author: Tom White <to...@cloudera.com>
Authored: Wed Aug 5 18:10:59 2015 +0100
Committer: Tom White <to...@cloudera.com>
Committed: Thu Mar 10 11:15:15 2016 +0000
----------------------------------------------------------------------
runners/spark/pom.xml | 2 +-
.../dataflow/spark/SparkProcessContext.java | 53 ++++++--------------
.../dataflow/spark/TransformTranslator.java | 8 +--
.../com/cloudera/dataflow/spark/TfIdfTest.java | 2 +-
.../dataflow/spark/TransformTranslatorTest.java | 5 ++
5 files changed, 24 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index de9efb9..74b0fed 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -23,7 +23,7 @@ License.
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.7</java.version>
<spark.version>1.3.1</spark.version>
- <google-cloud-dataflow-version>0.4.150710</google-cloud-dataflow-version>
+ <google-cloud-dataflow-version>0.4.150727</google-cloud-dataflow-version>
</properties>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
index bda838c..259f90c 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
@@ -19,7 +19,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -29,10 +28,11 @@ import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
-import com.google.cloud.dataflow.sdk.util.TimerManager;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.values.CodedTupleTag;
+import com.google.cloud.dataflow.sdk.util.state.StateInternals;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.common.collect.AbstractIterator;
@@ -126,6 +126,11 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
}
@Override
+ public PaneInfo pane() {
+ return PaneInfo.DEFAULT;
+ }
+
+ @Override
public WindowingInternals<I, O> windowingInternals() {
return new WindowingInternals<I, O>() {
@@ -136,53 +141,25 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
@Override
public void outputWindowedValue(O output, Instant timestamp, Collection<?
- extends BoundedWindow> windows) {
+ extends BoundedWindow> windows, PaneInfo paneInfo) {
output(output);
}
@Override
- public KeyedState keyedState() {
+ public StateInternals stateInternals() {
throw new UnsupportedOperationException(
- "WindowingInternals#keyedState() is not yet supported.");
-
+ "WindowingInternals#stateInternals() is not yet supported.");
}
@Override
- public <T> void writeToTagList(CodedTupleTag<T> tag, T value) throws IOException {
+ public TimerInternals timerInternals() {
throw new UnsupportedOperationException(
- "WindowingInternals#writeToTagList() is not yet supported.");
+ "WindowingInternals#timerInternals() is not yet supported.");
}
@Override
- public <T> void writeToTagList(CodedTupleTag<T> tag, T value, Instant timestamp)
- throws IOException {
- throw new UnsupportedOperationException(
- "WindowingInternals#writeToTagList() is not yet supported.");
- }
-
- @Override
- public <T> void deleteTagList(CodedTupleTag<T> tag) {
- throw new UnsupportedOperationException(
- "WindowingInternals#deleteTagList() is not yet supported.");
- }
-
- @Override
- public <T> Iterable<T> readTagList(CodedTupleTag<T> tag) throws IOException {
- throw new UnsupportedOperationException(
- "WindowingInternals#readTagList() is not yet supported.");
- }
-
- @Override
- public <T> Map<CodedTupleTag<T>, Iterable<T>> readTagList(List<CodedTupleTag<T>> tags)
- throws IOException {
- throw new UnsupportedOperationException(
- "WindowingInternals#readTagList() is not yet supported.");
- }
-
- @Override
- public TimerManager getTimerManager() {
- throw new UnsupportedOperationException(
- "WindowingInternals#getTimerManager() is not yet supported.");
+ public PaneInfo pane() {
+ return PaneInfo.DEFAULT;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
index 2c61a42..ee300fd 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
@@ -35,7 +35,6 @@ import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.KV;
@@ -511,11 +510,8 @@ public final class TransformTranslator {
return new TransformEvaluator<Window.Bound<T>>() {
@Override
public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
- if (transform.getWindowingStrategy().getWindowFn() instanceof GlobalWindows) {
- context.setOutputRDD(transform, context.getInputRDD(transform));
- } else {
- throw new UnsupportedOperationException("Non-global windowing not supported");
- }
+ // TODO: detect and support non-global windows
+ context.setOutputRDD(transform, context.getInputRDD(transform));
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java
index 680d8b7..35ab26e 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java
@@ -15,7 +15,7 @@
package com.cloudera.dataflow.spark;
-import com.google.cloud.dataflow.examples.TfIdf;
+import com.google.cloud.dataflow.examples.complete.TfIdf;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java
index 0251808..540bdd9 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java
@@ -23,6 +23,7 @@ import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.Charsets;
+import java.util.Collections;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Before;
@@ -76,6 +77,10 @@ public class TransformTranslatorTest {
List<String> sparkOutput =
Files.readAllLines(Paths.get(sparkOut + "-00000-of-00001"), Charsets.UTF_8);
+ // sort output to get a stable result (PCollections are not ordered)
+ Collections.sort(directOutput);
+ Collections.sort(sparkOutput);
+
Assert.assertArrayEquals(directOutput.toArray(), sparkOutput.toArray());
}