You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/10 21:59:13 UTC

[48/50] [abbrv] incubator-beam git commit: Add support for Flattenning (union) PCollections and test

Add support for Flattenning (union) PCollections and test

Wrong packcage utils


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34787303
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34787303
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34787303

Branch: refs/heads/master
Commit: 34787303499499b5b4e8d616e0ec71784940b44e
Parents: 7a2e9a7
Author: Sela <an...@paypal.com>
Authored: Sat Jan 16 13:00:11 2016 +0200
Committer: Tom White <to...@cloudera.com>
Committed: Thu Mar 10 11:15:17 2016 +0000

----------------------------------------------------------------------
 .../streaming/StreamingEvaluationContext.java   |  8 +-
 .../streaming/StreamingTransformTranslator.java | 86 ++++++++++++--------
 .../spark/streaming/FlattenStreamingTest.java   | 86 ++++++++++++++++++++
 .../streaming/SimpleStreamingWordCountTest.java |  2 -
 4 files changed, 144 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34787303/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
index 5e1b42d..3290729 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
@@ -116,7 +116,10 @@ public class StreamingEvaluationContext extends EvaluationContext {
   }
 
   public JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) {
-    PValue pvalue = (PValue) getInput(transform);
+    return getStream((PValue) getInput(transform));
+  }
+
+  public JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) {
     DStreamHolder<?> dStreamHolder = pstreams.get(pvalue);
     JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream();
     leafStreams.remove(dStreamHolder);
@@ -175,6 +178,9 @@ public class StreamingEvaluationContext extends EvaluationContext {
   }
 
   //---------------- override in order to expose in package
+  protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
+    return super.getInput(transform);
+  }
   @Override
   protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
     return super.getOutput(transform);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34787303/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
index 20ee88a..0153f38 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
@@ -17,9 +17,11 @@ package com.cloudera.dataflow.spark.streaming;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.api.client.util.Lists;
 import com.google.api.client.util.Maps;
 import com.google.api.client.util.Sets;
 import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -40,6 +42,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
 import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
 import com.google.cloud.dataflow.sdk.values.PDone;
 
 import kafka.serializer.Decoder;
@@ -83,8 +86,7 @@ public final class StreamingTransformTranslator {
         JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
             (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
             ((StreamingEvaluationContext) context).getStream(transform);
-        dstream.map(WindowingHelpers.<T>unwindowFunction())
-            .print(transform.getNum());
+        dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum());
       }
     };
   }
@@ -93,7 +95,8 @@ public final class StreamingTransformTranslator {
     return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() {
       @Override
       public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) {
-        JavaStreamingContext jssc = ((StreamingEvaluationContext) context).getStreamingContext();
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        JavaStreamingContext jssc = sec.getStreamingContext();
         Class<K> keyClazz = transform.getKeyClass();
         Class<V> valueClazz = transform.getValueClass();
         Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass();
@@ -109,7 +112,7 @@ public final class StreamingTransformTranslator {
             return KV.of(t2._1(), t2._2());
           }
         }).map(WindowingHelpers.<KV<K, V>>windowFunction());
-        ((StreamingEvaluationContext) context).setStream(transform, inputStream);
+        sec.setStream(transform, inputStream);
       }
     };
   }
@@ -121,19 +124,18 @@ public final class StreamingTransformTranslator {
       @Override
       public void evaluate(com.google.cloud.dataflow.sdk.transforms.Create.Values<T>
                                    transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         Iterable<T> elems = transform.getElements();
-        Coder<T> coder = ((StreamingEvaluationContext) context).getOutput(transform)
-                .getCoder();
+        Coder<T> coder = sec.getOutput(transform).getCoder();
         if (coder != VoidCoder.of()) {
           // actual create
-          ((StreamingEvaluationContext) context).setOutputRDDFromValues(transform,
-                  elems, coder);
+          sec.setOutputRDDFromValues(transform, elems, coder);
         } else {
           // fake create as an input
           // creates a stream with a single batch containing a single null element
           // to invoke following transformations once
           // to support DataflowAssert
-          ((StreamingEvaluationContext) context).setDStreamFromQueue(transform,
+          sec.setDStreamFromQueue(transform,
               Collections.<Iterable<Void>>singletonList(Collections.singletonList((Void) null)),
               (Coder<Void>) coder);
         }
@@ -144,13 +146,30 @@ public final class StreamingTransformTranslator {
   private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() {
     return new TransformEvaluator<CreateStream.QueuedValues<T>>() {
       @Override
-      public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext
-              context) {
+      public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         Iterable<Iterable<T>> values = transform.getQueuedValues();
-        Coder<T> coder = ((StreamingEvaluationContext) context).getOutput(transform)
-            .getCoder();
-        ((StreamingEvaluationContext) context).setDStreamFromQueue(transform, values,
-            coder);
+        Coder<T> coder = sec.getOutput(transform).getCoder();
+        sec.setDStreamFromQueue(transform, values, coder);
+      }
+    };
+  }
+
+  private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
+    return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        PCollectionList<T> pcs = sec.getInput(transform);
+        JavaDStream<WindowedValue<T>> first =
+            (JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(0));
+        List<JavaDStream<WindowedValue<T>>> rest = Lists.newArrayListWithCapacity(pcs.size() - 1);
+        for (int i = 1; i < pcs.size(); i++) {
+          rest.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(i)));
+        }
+        JavaDStream<WindowedValue<T>> dstream = sec.getStreamingContext().union(first, rest);
+        sec.setStream(transform, dstream);
       }
     };
   }
@@ -165,14 +184,14 @@ public final class StreamingTransformTranslator {
         final TransformEvaluator rddEvaluator =
             rddTranslator.translate((Class<? extends PTransform<?, ?>>) transform.getClass());
 
-        if (((StreamingEvaluationContext) context).hasStream(transform)) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        if (sec.hasStream(transform)) {
           JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
               (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
-              ((StreamingEvaluationContext) context).getStream(transform);
+              sec.getStream(transform);
 
-          ((StreamingEvaluationContext) context).setStream(transform, dStream
-              .transform(new RDDTransform<>((StreamingEvaluationContext) context,
-              rddEvaluator, transform)));
+          sec.setStream(transform, dStream
+              .transform(new RDDTransform<>(sec, rddEvaluator, transform)));
         } else {
           // if the transformation requires direct access to RDD (not in stream)
           // this is used for "fake" transformations like with DataflowAssert
@@ -235,13 +254,13 @@ public final class StreamingTransformTranslator {
         final TransformEvaluator rddEvaluator =
             rddTranslator.translate((Class<? extends PTransform<?, ?>>) transform.getClass());
 
-        if (((StreamingEvaluationContext) context).hasStream(transform)) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        if (sec.hasStream(transform)) {
           JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
-              (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>) (
-              (StreamingEvaluationContext) context).getStream(transform);
+              (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
+              sec.getStream(transform);
 
-          dStream.foreachRDD(new RDDOutputOperator<>((StreamingEvaluationContext) context,
-              rddEvaluator, transform));
+          dStream.foreachRDD(new RDDOutputOperator<>(sec, rddEvaluator, transform));
         } else {
           rddEvaluator.evaluate(transform, context);
         }
@@ -290,24 +309,22 @@ public final class StreamingTransformTranslator {
     return new TransformEvaluator<Window.Bound<T>>() {
       @Override
       public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         //--- first we apply windowing to the stream
         WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
         @SuppressWarnings("unchecked")
         JavaDStream<WindowedValue<T>> dStream =
-            (JavaDStream<WindowedValue<T>>)
-            ((StreamingEvaluationContext) context).getStream(transform);
+            (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
         if (windowFn instanceof FixedWindows) {
           Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize()
               .getMillis());
-          ((StreamingEvaluationContext) context)
-              .setStream(transform, dStream.window(windowDuration));
+          sec.setStream(transform, dStream.window(windowDuration));
         } else if (windowFn instanceof SlidingWindows) {
           Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize()
               .getMillis());
           Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod()
               .getMillis());
-          ((StreamingEvaluationContext) context)
-              .setStream(transform, dStream.window(windowDuration, slideDuration));
+          sec.setStream(transform, dStream.window(windowDuration, slideDuration));
         }
         //--- then we apply windowing to the elements
         DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
@@ -316,10 +333,9 @@ public final class StreamingTransformTranslator {
         @SuppressWarnings("unchecked")
         JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
             (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
-            ((StreamingEvaluationContext) context).getStream(transform);
+            sec.getStream(transform);
         //noinspection unchecked
-        ((StreamingEvaluationContext) context).setStream(transform,
-             dstream.mapPartitions(dofn));
+        sec.setStream(transform, dstream.mapPartitions(dofn));
       }
     };
   }
@@ -333,6 +349,7 @@ public final class StreamingTransformTranslator {
     EVALUATORS.put(Create.Values.class, create());
     EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka());
     EVALUATORS.put(Window.Bound.class, window());
+    EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
   }
 
   private static final Set<Class<? extends PTransform>> UNSUPPORTTED_EVALUATORS = Sets
@@ -346,7 +363,6 @@ public final class StreamingTransformTranslator {
     UNSUPPORTTED_EVALUATORS.add(AvroIO.Write.Bound.class);
     UNSUPPORTTED_EVALUATORS.add(HadoopIO.Read.Bound.class);
     UNSUPPORTTED_EVALUATORS.add(HadoopIO.Write.Bound.class);
-    UNSUPPORTTED_EVALUATORS.add(Flatten.FlattenPCollectionList.class);
   }
 
   private static <PT extends PTransform<?, ?>> boolean hasTransformEvaluator(Class<PT> clazz) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34787303/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/FlattenStreamingTest.java
new file mode 100644
index 0000000..d818e9a
--- /dev/null
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/FlattenStreamingTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+
+import com.cloudera.dataflow.io.CreateStream;
+import com.cloudera.dataflow.spark.EvaluationResult;
+import com.cloudera.dataflow.spark.SparkPipelineRunner;
+import com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptions;
+import com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptionsFactory;
+import com.cloudera.dataflow.spark.streaming.utils.DataflowAssertStreaming;
+
+import org.joda.time.Duration;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test Flatten (union) implementation for streaming.
+ */
+public class FlattenStreamingTest {
+
+  private static final String[] WORDS_ARRAY_1 = {
+      "one", "two", "three", "four"};
+  private static final List<Iterable<String>> WORDS_QUEUE_1 =
+      Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_1));
+  private static final String[] WORDS_ARRAY_2 = {
+          "five", "six", "seven", "eight"};
+  private static final List<Iterable<String>> WORDS_QUEUE_2 =
+          Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_2));
+  private static final String[] EXPECTED_UNION = {
+          "one", "two", "three", "four", "five", "six", "seven", "eight"};
+  final static long TEST_TIMEOUT_MSEC = 1000L;
+
+  @Test
+  public void testRun() throws Exception {
+    SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create();
+    options.setAppName(this.getClass().getSimpleName());
+    options.setRunner(SparkPipelineRunner.class);
+    options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<String> w1 =
+            p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of());
+    PCollection<String> windowedW1 =
+            w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
+    PCollection<String> w2 =
+            p.apply(CreateStream.fromQueue(WORDS_QUEUE_2)).setCoder(StringUtf8Coder.of());
+    PCollection<String> windowedW2 =
+            w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
+    PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
+    PCollection<String> union = list.apply(Flatten.<String>pCollections());
+
+    DataflowAssert.thatIterable(union.apply(View.<String>asIterable()))
+            .containsInAnyOrder(EXPECTED_UNION);
+
+    EvaluationResult res = SparkPipelineRunner.create(options).run(p);
+    res.close();
+
+    DataflowAssertStreaming.assertNoFailures(res);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34787303/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
index 613e517..eb23b5a 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
@@ -23,7 +23,6 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.common.collect.ImmutableSet;
 
-import com.cloudera.dataflow.io.ConsoleIO;
 import com.cloudera.dataflow.io.CreateStream;
 import com.cloudera.dataflow.spark.EvaluationResult;
 import com.cloudera.dataflow.spark.SimpleWordCountTest;
@@ -31,7 +30,6 @@ import com.cloudera.dataflow.spark.SparkPipelineRunner;
 import com.cloudera.dataflow.spark.streaming.utils.DataflowAssertStreaming;
 
 import org.joda.time.Duration;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Arrays;