You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/01/04 12:33:48 UTC

[1/2] beam git commit: [BEAM-1177] Input DStream "bundles" are now in serialized form and include relevant metadata.

Repository: beam
Updated Branches:
  refs/heads/master 5c612272d -> d1d85dfc7


[BEAM-1177] Input DStream "bundles" are now in serialized form and include relevant metadata.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a49fbcac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a49fbcac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a49fbcac

Branch: refs/heads/master
Commit: a49fbcacaf972a7b919d1544c5e3c83389fc3291
Parents: 5c61227
Author: Sela <an...@paypal.com>
Authored: Sun Dec 18 14:36:53 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Jan 4 13:05:34 2017 +0100

----------------------------------------------------------------------
 .../beam/runners/spark/io/MicrobatchSource.java |   4 +-
 .../runners/spark/io/SparkUnboundedSource.java  | 127 +++++++++++++------
 .../spark/stateful/StateSpecFunctions.java      |  37 ++++--
 3 files changed, 121 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a49fbcac/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index f42cca3..ff818a1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -258,8 +258,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
       return (CheckpointMarkT) reader.getCheckpointMark();
     }
 
-    public long getNumRecordsRead() {
-      return recordsRead;
+    public Instant getWatermark() {
+      return reader.getWatermark();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a49fbcac/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index 394b023..f03dc8c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -18,16 +18,20 @@
 
 package org.apache.beam.runners.spark.io;
 
+import java.io.Serializable;
 import java.util.Collections;
-import java.util.Iterator;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.stateful.StateSpecFunctions;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
-import org.apache.beam.runners.spark.translation.TranslationUtils;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext$;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.StateSpec;
@@ -39,6 +43,10 @@ import org.apache.spark.streaming.api.java.JavaPairInputDStream$;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.dstream.DStream;
 import org.apache.spark.streaming.scheduler.StreamInputInfo;
+import org.joda.time.Instant;
+
+import scala.Tuple2;
+import scala.runtime.BoxedUnit;
 
 
 /**
@@ -75,20 +83,39 @@ public class SparkUnboundedSource {
 
     // call mapWithState to read from a checkpointable sources.
     JavaMapWithStateDStream<Source<T>, CheckpointMarkT, byte[],
-        Iterator<WindowedValue<T>>> mapWithStateDStream = inputDStream.mapWithState(
+        Tuple2<Iterable<byte[]>, Metadata>> mapWithStateDStream = inputDStream.mapWithState(
             StateSpec.function(StateSpecFunctions.<T, CheckpointMarkT>mapSourceFunction(rc)));
 
     // set checkpoint duration for read stream, if set.
     checkpointStream(mapWithStateDStream, options);
-    // flatmap and report read elements. Use the inputDStream's id to tie between the reported
-    // info and the inputDStream it originated from.
-    int id = inputDStream.inputDStream().id();
-    ReportingFlatMappedDStream<WindowedValue<T>> reportingFlatMappedDStream =
-        new ReportingFlatMappedDStream<>(mapWithStateDStream.dstream(), id,
-            getSourceName(source, id));
+    // cache since checkpointing is less frequent.
+    mapWithStateDStream.cache();
 
-    return JavaDStream.fromDStream(reportingFlatMappedDStream,
-        JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag());
+    // report the number of input elements for this InputDStream to the InputInfoTracker.
+    int id = inputDStream.inputDStream().id();
+    JavaDStream<Metadata> metadataDStream = mapWithStateDStream.map(
+        new Function<Tuple2<Iterable<byte[]>, Metadata>, Metadata>() {
+          @Override
+          public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
+            return t2._2();
+          }
+        });
+
+    // register the ReportingDStream op.
+    new ReportingDStream(metadataDStream.dstream(), id, getSourceName(source, id)).register();
+
+    // output the actual (deserialized) stream.
+    WindowedValue.FullWindowedValueCoder<T> coder =
+        WindowedValue.FullWindowedValueCoder.of(
+            source.getDefaultOutputCoder(),
+            GlobalWindow.Coder.INSTANCE);
+    return mapWithStateDStream.flatMap(
+        new FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]>() {
+          @Override
+          public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
+            return t2._1();
+          }
+        }).map(CoderHelpers.fromByteFunction(coder));
   }
 
   private static <T> String getSourceName(Source<T> source, int id) {
@@ -111,20 +138,20 @@ public class SparkUnboundedSource {
   }
 
   /**
-   * A flatMap DStream function that "flattens" the Iterators read by the
-   * {@link MicrobatchSource.Reader}s, while reporting the properties of the read to the
-   * {@link org.apache.spark.streaming.scheduler.InputInfoTracker} for RateControl purposes
-   * and visibility.
+   * A DStream function that reports the properties of the read to the
+   * {@link org.apache.spark.streaming.scheduler.InputInfoTracker}
+   * for RateControl purposes and visibility.
    */
-  private static class ReportingFlatMappedDStream<T> extends DStream<T> {
-    private final DStream<Iterator<T>> parent;
+  private static class ReportingDStream extends DStream<BoxedUnit> {
+    private final DStream<Metadata> parent;
     private final int inputDStreamId;
     private final String sourceName;
 
-    ReportingFlatMappedDStream(DStream<Iterator<T>> parent,
-                               int inputDStreamId,
-                               String sourceName) {
-      super(parent.ssc(), JavaSparkContext$.MODULE$.<T>fakeClassTag());
+    ReportingDStream(
+        DStream<Metadata> parent,
+        int inputDStreamId,
+        String sourceName) {
+      super(parent.ssc(), JavaSparkContext$.MODULE$.<BoxedUnit>fakeClassTag());
       this.parent = parent;
       this.inputDStreamId = inputDStreamId;
       this.sourceName = sourceName;
@@ -142,31 +169,59 @@ public class SparkUnboundedSource {
     }
 
     @Override
-    public scala.Option<RDD<T>> compute(Time validTime) {
+    public scala.Option<RDD<BoxedUnit>> compute(Time validTime) {
       // compute parent.
-      scala.Option<RDD<Iterator<T>>> computedParentRDD = parent.getOrCompute(validTime);
-      // compute this DStream - take single-iterator partitions an flatMap them.
-      if (computedParentRDD.isDefined()) {
-        RDD<T> computedRDD = computedParentRDD.get().toJavaRDD()
-            .flatMap(TranslationUtils.<T>flattenIter()).rdd().cache();
-        // report - for RateEstimator and visibility.
-        report(validTime, computedRDD.count());
-        return scala.Option.apply(computedRDD);
-      } else {
-        report(validTime, 0);
-        return scala.Option.empty();
+      scala.Option<RDD<Metadata>> parentRDDOpt = parent.getOrCompute(validTime);
+      long count = 0;
+      Instant globalWatermark = new Instant(Long.MIN_VALUE);
+      if (parentRDDOpt.isDefined()) {
+        JavaRDD<Metadata> parentRDD = parentRDDOpt.get().toJavaRDD();
+        for (Metadata metadata: parentRDD.collect()) {
+          count += metadata.getNumRecords();
+          // a monotonically increasing watermark.
+          globalWatermark = globalWatermark.isBefore(metadata.getWatermark())
+              ? metadata.getWatermark() : globalWatermark;
+        }
       }
+      // report - for RateEstimator and visibility.
+      report(validTime, count, globalWatermark);
+      return scala.Option.empty();
     }
 
-    private void report(Time batchTime, long count) {
+    private void report(Time batchTime, long count, Instant watermark) {
       // metadata - #records read and a description.
       scala.collection.immutable.Map<String, Object> metadata =
           new scala.collection.immutable.Map.Map1<String, Object>(
               StreamInputInfo.METADATA_KEY_DESCRIPTION(),
-                  String.format("Read %d records from %s for batch time: %s", count, sourceName,
-                      batchTime));
+              String.format(
+                  "Read %d records with observed watermark %s, from %s for batch time: %s",
+                  count,
+                  watermark,
+                  sourceName,
+                  batchTime));
       StreamInputInfo streamInputInfo = new StreamInputInfo(inputDStreamId, count, metadata);
       ssc().scheduler().inputInfoTracker().reportInfo(batchTime, streamInputInfo);
     }
   }
+
+  /**
+   * A metadata holder for an input stream partition.
+   */
+  public static class Metadata implements Serializable {
+    private final long numRecords;
+    private final Instant watermark;
+
+    public Metadata(long numRecords, Instant watermark) {
+      this.numRecords = numRecords;
+      this.watermark = watermark;
+    }
+
+    public long getNumRecords() {
+      return numRecords;
+    }
+
+    public Instant getWatermark() {
+      return watermark;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a49fbcac/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 053f4ac..ffe0ddd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.EmptyCheckpointMark;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
+import org.apache.beam.runners.spark.io.SparkUnboundedSource.Metadata;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -39,10 +40,12 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.streaming.State;
 import org.apache.spark.streaming.StateSpec;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import scala.Option;
+import scala.Tuple2;
 import scala.runtime.AbstractFunction3;
 
 /**
@@ -92,14 +95,17 @@ public class StateSpecFunctions {
    */
   public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
   scala.Function3<Source<T>, scala.Option<CheckpointMarkT>, /* CheckpointMarkT */State<byte[]>,
-      Iterator<WindowedValue<T>>> mapSourceFunction(final SparkRuntimeContext runtimeContext) {
+      Tuple2<Iterable<byte[]>, Metadata>> mapSourceFunction(
+           final SparkRuntimeContext runtimeContext) {
 
     return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>, State<byte[]>,
-        Iterator<WindowedValue<T>>>() {
+        Tuple2<Iterable<byte[]>, Metadata>>() {
 
       @Override
-      public Iterator<WindowedValue<T>> apply(Source<T> source, scala.Option<CheckpointMarkT>
-          startCheckpointMark, State<byte[]> state) {
+      public Tuple2<Iterable<byte[]>, Metadata> apply(
+          Source<T> source,
+          scala.Option<CheckpointMarkT> startCheckpointMark,
+          State<byte[]> state) {
         // source as MicrobatchSource
         MicrobatchSource<T, CheckpointMarkT> microbatchSource =
             (MicrobatchSource<T, CheckpointMarkT>) source;
@@ -130,18 +136,25 @@ public class StateSpecFunctions {
           throw new RuntimeException(e);
         }
 
-        // read microbatch.
-        final List<WindowedValue<T>> readValues = new ArrayList<>();
+        // read microbatch as a serialized collection.
+        final List<byte[]> readValues = new ArrayList<>();
+        final Instant watermark;
+        WindowedValue.FullWindowedValueCoder<T> coder =
+            WindowedValue.FullWindowedValueCoder.of(
+                source.getDefaultOutputCoder(),
+                GlobalWindow.Coder.INSTANCE);
         try {
           // measure how long a read takes per-partition.
           Stopwatch stopwatch = Stopwatch.createStarted();
           boolean finished = !reader.start();
           while (!finished) {
-            readValues.add(WindowedValue.of(reader.getCurrent(), reader.getCurrentTimestamp(),
-                GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+            WindowedValue<T> wv = WindowedValue.of(reader.getCurrent(),
+                reader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+            readValues.add(CoderHelpers.toByteArray(wv, coder));
             finished = !reader.advance();
           }
 
+          watermark = ((MicrobatchSource.Reader) reader).getWatermark();
           // close and checkpoint reader.
           reader.close();
           LOG.info("Source id {} spent {} msec on reading.", microbatchSource.getId(),
@@ -160,7 +173,13 @@ public class StateSpecFunctions {
           throw new RuntimeException("Failed to read from reader.", e);
         }
 
-        return Iterators.unmodifiableIterator(readValues.iterator());
+        Iterable <byte[]> iterable = new Iterable<byte[]>() {
+          @Override
+          public Iterator<byte[]> iterator() {
+            return Iterators.unmodifiableIterator(readValues.iterator());
+          }
+        };
+        return new Tuple2<>(iterable, new Metadata(readValues.size(), watermark));
       }
     };
   }


[2/2] beam git commit: [BEAM-1177] This closes #1654

Posted by jb...@apache.org.
[BEAM-1177] This closes #1654


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1d85dfc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1d85dfc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1d85dfc

Branch: refs/heads/master
Commit: d1d85dfc76574e925dbefe9dfbf6edb8bcab8597
Parents: 5c61227 a49fbca
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Wed Jan 4 13:07:26 2017 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Jan 4 13:07:26 2017 +0100

----------------------------------------------------------------------
 .../beam/runners/spark/io/MicrobatchSource.java |   4 +-
 .../runners/spark/io/SparkUnboundedSource.java  | 127 +++++++++++++------
 .../spark/stateful/StateSpecFunctions.java      |  37 ++++--
 3 files changed, 121 insertions(+), 47 deletions(-)
----------------------------------------------------------------------