You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/10 21:59:13 UTC
[48/50] [abbrv] incubator-beam git commit: Add support for
Flattenning (union) PCollections and test
Add support for Flattenning (union) PCollections and test
Wrong packcage utils
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34787303
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34787303
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34787303
Branch: refs/heads/master
Commit: 34787303499499b5b4e8d616e0ec71784940b44e
Parents: 7a2e9a7
Author: Sela <an...@paypal.com>
Authored: Sat Jan 16 13:00:11 2016 +0200
Committer: Tom White <to...@cloudera.com>
Committed: Thu Mar 10 11:15:17 2016 +0000
----------------------------------------------------------------------
.../streaming/StreamingEvaluationContext.java | 8 +-
.../streaming/StreamingTransformTranslator.java | 86 ++++++++++++--------
.../spark/streaming/FlattenStreamingTest.java | 86 ++++++++++++++++++++
.../streaming/SimpleStreamingWordCountTest.java | 2 -
4 files changed, 144 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34787303/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
index 5e1b42d..3290729 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
@@ -116,7 +116,10 @@ public class StreamingEvaluationContext extends EvaluationContext {
}
public JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) {
- PValue pvalue = (PValue) getInput(transform);
+ return getStream((PValue) getInput(transform));
+ }
+
+ public JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) {
DStreamHolder<?> dStreamHolder = pstreams.get(pvalue);
JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream();
leafStreams.remove(dStreamHolder);
@@ -175,6 +178,9 @@ public class StreamingEvaluationContext extends EvaluationContext {
}
//---------------- override in order to expose in package
+ protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
+ return super.getInput(transform);
+ }
@Override
protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
return super.getOutput(transform);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34787303/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
index 20ee88a..0153f38 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
@@ -17,9 +17,11 @@ package com.cloudera.dataflow.spark.streaming;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.api.client.util.Lists;
import com.google.api.client.util.Maps;
import com.google.api.client.util.Sets;
import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -40,6 +42,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
import com.google.cloud.dataflow.sdk.values.PDone;
import kafka.serializer.Decoder;
@@ -83,8 +86,7 @@ public final class StreamingTransformTranslator {
JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
(JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
((StreamingEvaluationContext) context).getStream(transform);
- dstream.map(WindowingHelpers.<T>unwindowFunction())
- .print(transform.getNum());
+ dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum());
}
};
}
@@ -93,7 +95,8 @@ public final class StreamingTransformTranslator {
return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() {
@Override
public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) {
- JavaStreamingContext jssc = ((StreamingEvaluationContext) context).getStreamingContext();
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+ JavaStreamingContext jssc = sec.getStreamingContext();
Class<K> keyClazz = transform.getKeyClass();
Class<V> valueClazz = transform.getValueClass();
Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass();
@@ -109,7 +112,7 @@ public final class StreamingTransformTranslator {
return KV.of(t2._1(), t2._2());
}
}).map(WindowingHelpers.<KV<K, V>>windowFunction());
- ((StreamingEvaluationContext) context).setStream(transform, inputStream);
+ sec.setStream(transform, inputStream);
}
};
}
@@ -121,19 +124,18 @@ public final class StreamingTransformTranslator {
@Override
public void evaluate(com.google.cloud.dataflow.sdk.transforms.Create.Values<T>
transform, EvaluationContext context) {
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
Iterable<T> elems = transform.getElements();
- Coder<T> coder = ((StreamingEvaluationContext) context).getOutput(transform)
- .getCoder();
+ Coder<T> coder = sec.getOutput(transform).getCoder();
if (coder != VoidCoder.of()) {
// actual create
- ((StreamingEvaluationContext) context).setOutputRDDFromValues(transform,
- elems, coder);
+ sec.setOutputRDDFromValues(transform, elems, coder);
} else {
// fake create as an input
// creates a stream with a single batch containing a single null element
// to invoke following transformations once
// to support DataflowAssert
- ((StreamingEvaluationContext) context).setDStreamFromQueue(transform,
+ sec.setDStreamFromQueue(transform,
Collections.<Iterable<Void>>singletonList(Collections.singletonList((Void) null)),
(Coder<Void>) coder);
}
@@ -144,13 +146,30 @@ public final class StreamingTransformTranslator {
private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() {
return new TransformEvaluator<CreateStream.QueuedValues<T>>() {
@Override
- public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext
- context) {
+ public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) {
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
Iterable<Iterable<T>> values = transform.getQueuedValues();
- Coder<T> coder = ((StreamingEvaluationContext) context).getOutput(transform)
- .getCoder();
- ((StreamingEvaluationContext) context).setDStreamFromQueue(transform, values,
- coder);
+ Coder<T> coder = sec.getOutput(transform).getCoder();
+ sec.setDStreamFromQueue(transform, values, coder);
+ }
+ };
+ }
+
+ private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
+ return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+ PCollectionList<T> pcs = sec.getInput(transform);
+ JavaDStream<WindowedValue<T>> first =
+ (JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(0));
+ List<JavaDStream<WindowedValue<T>>> rest = Lists.newArrayListWithCapacity(pcs.size() - 1);
+ for (int i = 1; i < pcs.size(); i++) {
+ rest.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(i)));
+ }
+ JavaDStream<WindowedValue<T>> dstream = sec.getStreamingContext().union(first, rest);
+ sec.setStream(transform, dstream);
}
};
}
@@ -165,14 +184,14 @@ public final class StreamingTransformTranslator {
final TransformEvaluator rddEvaluator =
rddTranslator.translate((Class<? extends PTransform<?, ?>>) transform.getClass());
- if (((StreamingEvaluationContext) context).hasStream(transform)) {
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+ if (sec.hasStream(transform)) {
JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
(JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
- ((StreamingEvaluationContext) context).getStream(transform);
+ sec.getStream(transform);
- ((StreamingEvaluationContext) context).setStream(transform, dStream
- .transform(new RDDTransform<>((StreamingEvaluationContext) context,
- rddEvaluator, transform)));
+ sec.setStream(transform, dStream
+ .transform(new RDDTransform<>(sec, rddEvaluator, transform)));
} else {
// if the transformation requires direct access to RDD (not in stream)
// this is used for "fake" transformations like with DataflowAssert
@@ -235,13 +254,13 @@ public final class StreamingTransformTranslator {
final TransformEvaluator rddEvaluator =
rddTranslator.translate((Class<? extends PTransform<?, ?>>) transform.getClass());
- if (((StreamingEvaluationContext) context).hasStream(transform)) {
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+ if (sec.hasStream(transform)) {
JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
- (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>) (
- (StreamingEvaluationContext) context).getStream(transform);
+ (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
+ sec.getStream(transform);
- dStream.foreachRDD(new RDDOutputOperator<>((StreamingEvaluationContext) context,
- rddEvaluator, transform));
+ dStream.foreachRDD(new RDDOutputOperator<>(sec, rddEvaluator, transform));
} else {
rddEvaluator.evaluate(transform, context);
}
@@ -290,24 +309,22 @@ public final class StreamingTransformTranslator {
return new TransformEvaluator<Window.Bound<T>>() {
@Override
public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
//--- first we apply windowing to the stream
WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
@SuppressWarnings("unchecked")
JavaDStream<WindowedValue<T>> dStream =
- (JavaDStream<WindowedValue<T>>)
- ((StreamingEvaluationContext) context).getStream(transform);
+ (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
if (windowFn instanceof FixedWindows) {
Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize()
.getMillis());
- ((StreamingEvaluationContext) context)
- .setStream(transform, dStream.window(windowDuration));
+ sec.setStream(transform, dStream.window(windowDuration));
} else if (windowFn instanceof SlidingWindows) {
Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize()
.getMillis());
Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod()
.getMillis());
- ((StreamingEvaluationContext) context)
- .setStream(transform, dStream.window(windowDuration, slideDuration));
+ sec.setStream(transform, dStream.window(windowDuration, slideDuration));
}
//--- then we apply windowing to the elements
DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
@@ -316,10 +333,9 @@ public final class StreamingTransformTranslator {
@SuppressWarnings("unchecked")
JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
(JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
- ((StreamingEvaluationContext) context).getStream(transform);
+ sec.getStream(transform);
//noinspection unchecked
- ((StreamingEvaluationContext) context).setStream(transform,
- dstream.mapPartitions(dofn));
+ sec.setStream(transform, dstream.mapPartitions(dofn));
}
};
}
@@ -333,6 +349,7 @@ public final class StreamingTransformTranslator {
EVALUATORS.put(Create.Values.class, create());
EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka());
EVALUATORS.put(Window.Bound.class, window());
+ EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
}
private static final Set<Class<? extends PTransform>> UNSUPPORTTED_EVALUATORS = Sets
@@ -346,7 +363,6 @@ public final class StreamingTransformTranslator {
UNSUPPORTTED_EVALUATORS.add(AvroIO.Write.Bound.class);
UNSUPPORTTED_EVALUATORS.add(HadoopIO.Read.Bound.class);
UNSUPPORTTED_EVALUATORS.add(HadoopIO.Write.Bound.class);
- UNSUPPORTTED_EVALUATORS.add(Flatten.FlattenPCollectionList.class);
}
private static <PT extends PTransform<?, ?>> boolean hasTransformEvaluator(Class<PT> clazz) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34787303/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/FlattenStreamingTest.java
new file mode 100644
index 0000000..d818e9a
--- /dev/null
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/FlattenStreamingTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+
+import com.cloudera.dataflow.io.CreateStream;
+import com.cloudera.dataflow.spark.EvaluationResult;
+import com.cloudera.dataflow.spark.SparkPipelineRunner;
+import com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptions;
+import com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptionsFactory;
+import com.cloudera.dataflow.spark.streaming.utils.DataflowAssertStreaming;
+
+import org.joda.time.Duration;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test Flatten (union) implementation for streaming.
+ */
+public class FlattenStreamingTest {
+
+ private static final String[] WORDS_ARRAY_1 = {
+ "one", "two", "three", "four"};
+ private static final List<Iterable<String>> WORDS_QUEUE_1 =
+ Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_1));
+ private static final String[] WORDS_ARRAY_2 = {
+ "five", "six", "seven", "eight"};
+ private static final List<Iterable<String>> WORDS_QUEUE_2 =
+ Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_2));
+ private static final String[] EXPECTED_UNION = {
+ "one", "two", "three", "four", "five", "six", "seven", "eight"};
+ final static long TEST_TIMEOUT_MSEC = 1000L;
+
+ @Test
+ public void testRun() throws Exception {
+ SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create();
+ options.setAppName(this.getClass().getSimpleName());
+ options.setRunner(SparkPipelineRunner.class);
+ options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+ Pipeline p = Pipeline.create(options);
+
+ PCollection<String> w1 =
+ p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of());
+ PCollection<String> windowedW1 =
+ w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
+ PCollection<String> w2 =
+ p.apply(CreateStream.fromQueue(WORDS_QUEUE_2)).setCoder(StringUtf8Coder.of());
+ PCollection<String> windowedW2 =
+ w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
+ PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
+ PCollection<String> union = list.apply(Flatten.<String>pCollections());
+
+ DataflowAssert.thatIterable(union.apply(View.<String>asIterable()))
+ .containsInAnyOrder(EXPECTED_UNION);
+
+ EvaluationResult res = SparkPipelineRunner.create(options).run(p);
+ res.close();
+
+ DataflowAssertStreaming.assertNoFailures(res);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34787303/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
index 613e517..eb23b5a 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
@@ -23,7 +23,6 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.collect.ImmutableSet;
-import com.cloudera.dataflow.io.ConsoleIO;
import com.cloudera.dataflow.io.CreateStream;
import com.cloudera.dataflow.spark.EvaluationResult;
import com.cloudera.dataflow.spark.SimpleWordCountTest;
@@ -31,7 +30,6 @@ import com.cloudera.dataflow.spark.SparkPipelineRunner;
import com.cloudera.dataflow.spark.streaming.utils.DataflowAssertStreaming;
import org.joda.time.Duration;
-import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;