You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/10 21:58:49 UTC
[24/50] [abbrv] incubator-beam git commit: Update to dataflow
0.4.150710.
Update to dataflow 0.4.150710.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c01421ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c01421ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c01421ce
Branch: refs/heads/master
Commit: c01421ce3375712bae0c0b88b5f2141c67284ade
Parents: 3cae69b
Author: Tom White <to...@cloudera.com>
Authored: Mon Jul 13 21:08:57 2015 +0100
Committer: Tom White <to...@cloudera.com>
Committed: Thu Mar 10 11:15:15 2016 +0000
----------------------------------------------------------------------
runners/spark/pom.xml | 2 +-
.../com/cloudera/dataflow/spark/SparkProcessContext.java | 8 ++++----
.../com/cloudera/dataflow/spark/TransformTranslator.java | 8 ++++----
.../test/java/com/cloudera/dataflow/spark/NumShardsTest.java | 4 +++-
.../java/com/cloudera/dataflow/spark/SerializationTest.java | 4 ++--
5 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 0382108..3bce8c0 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -23,7 +23,7 @@ License.
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.7</java.version>
<spark.version>1.3.1</spark.version>
- <google-cloud-dataflow-version>0.4.150602</google-cloud-dataflow-version>
+ <google-cloud-dataflow-version>0.4.150710</google-cloud-dataflow-version>
</properties>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
index ee51c35..d0e9d6a 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
@@ -148,14 +148,14 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
}
@Override
- public <T> void store(CodedTupleTag<T> tag, T value, Instant timestamp)
- throws IOException {
+ public <T> void writeToTagList(CodedTupleTag<T> tag, T value) throws IOException {
throw new UnsupportedOperationException(
- "WindowingInternals#store() is not yet supported.");
+ "WindowingInternals#writeToTagList() is not yet supported.");
}
@Override
- public <T> void writeToTagList(CodedTupleTag<T> tag, T value) throws IOException {
+ public <T> void writeToTagList(CodedTupleTag<T> tag, T value, Instant timestamp)
+ throws IOException {
throw new UnsupportedOperationException(
"WindowingInternals#writeToTagList() is not yet supported.");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
index e1af3cf..f137218 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
@@ -520,10 +520,10 @@ public final class TransformTranslator {
};
}
- private static <T> TransformEvaluator<Create<T>> create() {
- return new TransformEvaluator<Create<T>>() {
+ private static <T> TransformEvaluator<Create.Values<T>> create() {
+ return new TransformEvaluator<Create.Values<T>>() {
@Override
- public void evaluate(Create<T> transform, EvaluationContext context) {
+ public void evaluate(Create.Values<T> transform, EvaluationContext context) {
Iterable<T> elems = transform.getElements();
// Use a coder to convert the objects in the PCollection to byte arrays, so they
// can be transferred over the network.
@@ -624,7 +624,7 @@ public final class TransformTranslator {
EVALUATORS.put(Combine.Globally.class, combineGlobally());
EVALUATORS.put(Combine.PerKey.class, combinePerKey());
EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
- EVALUATORS.put(Create.class, create());
+ EVALUATORS.put(Create.Values.class, create());
EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
EVALUATORS.put(View.AsIterable.class, viewAsIter());
EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java
index 8985e66..9572b0f 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java
@@ -20,6 +20,7 @@ import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.Charsets;
import com.google.common.collect.Sets;
@@ -62,7 +63,8 @@ public class NumShardsTest {
options.setRunner(SparkPipelineRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
- PCollection<String> output = inputWords.apply(new WordCount.CountWords());
+ PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+ .apply(ParDo.of(new WordCount.FormatAsTextFn()));
output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
EvaluationResult res = SparkPipelineRunner.create().run(p);
res.close();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
index 40591e5..bd1a4e8 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
@@ -112,8 +112,8 @@ public class SerializationTest {
SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
options.setRunner(SparkPipelineRunner.class);
Pipeline p = Pipeline.create(options);
- PCollection<StringHolder> inputWords = p.apply(Create.of(WORDS)).setCoder
- (StringHolderUtf8Coder.of());
+ PCollection<StringHolder> inputWords =
+ p.apply(Create.of(WORDS).withCoder(StringHolderUtf8Coder.of()));
PCollection<StringHolder> output = inputWords.apply(new CountWords());
DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);