You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/01/04 12:33:48 UTC
[1/2] beam git commit: [BEAM-1177] Input DStream "bundles" are now in
serialized form and include relevant metadata.
Repository: beam
Updated Branches:
refs/heads/master 5c612272d -> d1d85dfc7
[BEAM-1177] Input DStream "bundles" are now in serialized form and include relevant metadata.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a49fbcac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a49fbcac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a49fbcac
Branch: refs/heads/master
Commit: a49fbcacaf972a7b919d1544c5e3c83389fc3291
Parents: 5c61227
Author: Sela <an...@paypal.com>
Authored: Sun Dec 18 14:36:53 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Jan 4 13:05:34 2017 +0100
----------------------------------------------------------------------
.../beam/runners/spark/io/MicrobatchSource.java | 4 +-
.../runners/spark/io/SparkUnboundedSource.java | 127 +++++++++++++------
.../spark/stateful/StateSpecFunctions.java | 37 ++++--
3 files changed, 121 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a49fbcac/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index f42cca3..ff818a1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -258,8 +258,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
return (CheckpointMarkT) reader.getCheckpointMark();
}
- public long getNumRecordsRead() {
- return recordsRead;
+ public Instant getWatermark() {
+ return reader.getWatermark();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a49fbcac/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index 394b023..f03dc8c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -18,16 +18,20 @@
package org.apache.beam.runners.spark.io;
+import java.io.Serializable;
import java.util.Collections;
-import java.util.Iterator;
import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.stateful.StateSpecFunctions;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
-import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext$;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.StateSpec;
@@ -39,6 +43,10 @@ import org.apache.spark.streaming.api.java.JavaPairInputDStream$;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.scheduler.StreamInputInfo;
+import org.joda.time.Instant;
+
+import scala.Tuple2;
+import scala.runtime.BoxedUnit;
/**
@@ -75,20 +83,39 @@ public class SparkUnboundedSource {
// call mapWithState to read from a checkpointable sources.
JavaMapWithStateDStream<Source<T>, CheckpointMarkT, byte[],
- Iterator<WindowedValue<T>>> mapWithStateDStream = inputDStream.mapWithState(
+ Tuple2<Iterable<byte[]>, Metadata>> mapWithStateDStream = inputDStream.mapWithState(
StateSpec.function(StateSpecFunctions.<T, CheckpointMarkT>mapSourceFunction(rc)));
// set checkpoint duration for read stream, if set.
checkpointStream(mapWithStateDStream, options);
- // flatmap and report read elements. Use the inputDStream's id to tie between the reported
- // info and the inputDStream it originated from.
- int id = inputDStream.inputDStream().id();
- ReportingFlatMappedDStream<WindowedValue<T>> reportingFlatMappedDStream =
- new ReportingFlatMappedDStream<>(mapWithStateDStream.dstream(), id,
- getSourceName(source, id));
+ // cache since checkpointing is less frequent.
+ mapWithStateDStream.cache();
- return JavaDStream.fromDStream(reportingFlatMappedDStream,
- JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag());
+ // report the number of input elements for this InputDStream to the InputInfoTracker.
+ int id = inputDStream.inputDStream().id();
+ JavaDStream<Metadata> metadataDStream = mapWithStateDStream.map(
+ new Function<Tuple2<Iterable<byte[]>, Metadata>, Metadata>() {
+ @Override
+ public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
+ return t2._2();
+ }
+ });
+
+ // register the ReportingDStream op.
+ new ReportingDStream(metadataDStream.dstream(), id, getSourceName(source, id)).register();
+
+ // output the actual (deserialized) stream.
+ WindowedValue.FullWindowedValueCoder<T> coder =
+ WindowedValue.FullWindowedValueCoder.of(
+ source.getDefaultOutputCoder(),
+ GlobalWindow.Coder.INSTANCE);
+ return mapWithStateDStream.flatMap(
+ new FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]>() {
+ @Override
+ public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
+ return t2._1();
+ }
+ }).map(CoderHelpers.fromByteFunction(coder));
}
private static <T> String getSourceName(Source<T> source, int id) {
@@ -111,20 +138,20 @@ public class SparkUnboundedSource {
}
/**
- * A flatMap DStream function that "flattens" the Iterators read by the
- * {@link MicrobatchSource.Reader}s, while reporting the properties of the read to the
- * {@link org.apache.spark.streaming.scheduler.InputInfoTracker} for RateControl purposes
- * and visibility.
+ * A DStream function that reports the properties of the read to the
+ * {@link org.apache.spark.streaming.scheduler.InputInfoTracker}
+ * for RateControl purposes and visibility.
*/
- private static class ReportingFlatMappedDStream<T> extends DStream<T> {
- private final DStream<Iterator<T>> parent;
+ private static class ReportingDStream extends DStream<BoxedUnit> {
+ private final DStream<Metadata> parent;
private final int inputDStreamId;
private final String sourceName;
- ReportingFlatMappedDStream(DStream<Iterator<T>> parent,
- int inputDStreamId,
- String sourceName) {
- super(parent.ssc(), JavaSparkContext$.MODULE$.<T>fakeClassTag());
+ ReportingDStream(
+ DStream<Metadata> parent,
+ int inputDStreamId,
+ String sourceName) {
+ super(parent.ssc(), JavaSparkContext$.MODULE$.<BoxedUnit>fakeClassTag());
this.parent = parent;
this.inputDStreamId = inputDStreamId;
this.sourceName = sourceName;
@@ -142,31 +169,59 @@ public class SparkUnboundedSource {
}
@Override
- public scala.Option<RDD<T>> compute(Time validTime) {
+ public scala.Option<RDD<BoxedUnit>> compute(Time validTime) {
// compute parent.
- scala.Option<RDD<Iterator<T>>> computedParentRDD = parent.getOrCompute(validTime);
- // compute this DStream - take single-iterator partitions an flatMap them.
- if (computedParentRDD.isDefined()) {
- RDD<T> computedRDD = computedParentRDD.get().toJavaRDD()
- .flatMap(TranslationUtils.<T>flattenIter()).rdd().cache();
- // report - for RateEstimator and visibility.
- report(validTime, computedRDD.count());
- return scala.Option.apply(computedRDD);
- } else {
- report(validTime, 0);
- return scala.Option.empty();
+ scala.Option<RDD<Metadata>> parentRDDOpt = parent.getOrCompute(validTime);
+ long count = 0;
+ Instant globalWatermark = new Instant(Long.MIN_VALUE);
+ if (parentRDDOpt.isDefined()) {
+ JavaRDD<Metadata> parentRDD = parentRDDOpt.get().toJavaRDD();
+ for (Metadata metadata: parentRDD.collect()) {
+ count += metadata.getNumRecords();
+ // a monotonically increasing watermark.
+ globalWatermark = globalWatermark.isBefore(metadata.getWatermark())
+ ? metadata.getWatermark() : globalWatermark;
+ }
}
+ // report - for RateEstimator and visibility.
+ report(validTime, count, globalWatermark);
+ return scala.Option.empty();
}
- private void report(Time batchTime, long count) {
+ private void report(Time batchTime, long count, Instant watermark) {
// metadata - #records read and a description.
scala.collection.immutable.Map<String, Object> metadata =
new scala.collection.immutable.Map.Map1<String, Object>(
StreamInputInfo.METADATA_KEY_DESCRIPTION(),
- String.format("Read %d records from %s for batch time: %s", count, sourceName,
- batchTime));
+ String.format(
+ "Read %d records with observed watermark %s, from %s for batch time: %s",
+ count,
+ watermark,
+ sourceName,
+ batchTime));
StreamInputInfo streamInputInfo = new StreamInputInfo(inputDStreamId, count, metadata);
ssc().scheduler().inputInfoTracker().reportInfo(batchTime, streamInputInfo);
}
}
+
+ /**
+ * A metadata holder for an input stream partition.
+ */
+ public static class Metadata implements Serializable {
+ private final long numRecords;
+ private final Instant watermark;
+
+ public Metadata(long numRecords, Instant watermark) {
+ this.numRecords = numRecords;
+ this.watermark = watermark;
+ }
+
+ public long getNumRecords() {
+ return numRecords;
+ }
+
+ public Instant getWatermark() {
+ return watermark;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a49fbcac/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 053f4ac..ffe0ddd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.EmptyCheckpointMark;
import org.apache.beam.runners.spark.io.MicrobatchSource;
+import org.apache.beam.runners.spark.io.SparkUnboundedSource.Metadata;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
@@ -39,10 +40,12 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
+import scala.Tuple2;
import scala.runtime.AbstractFunction3;
/**
@@ -92,14 +95,17 @@ public class StateSpecFunctions {
*/
public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
scala.Function3<Source<T>, scala.Option<CheckpointMarkT>, /* CheckpointMarkT */State<byte[]>,
- Iterator<WindowedValue<T>>> mapSourceFunction(final SparkRuntimeContext runtimeContext) {
+ Tuple2<Iterable<byte[]>, Metadata>> mapSourceFunction(
+ final SparkRuntimeContext runtimeContext) {
return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>, State<byte[]>,
- Iterator<WindowedValue<T>>>() {
+ Tuple2<Iterable<byte[]>, Metadata>>() {
@Override
- public Iterator<WindowedValue<T>> apply(Source<T> source, scala.Option<CheckpointMarkT>
- startCheckpointMark, State<byte[]> state) {
+ public Tuple2<Iterable<byte[]>, Metadata> apply(
+ Source<T> source,
+ scala.Option<CheckpointMarkT> startCheckpointMark,
+ State<byte[]> state) {
// source as MicrobatchSource
MicrobatchSource<T, CheckpointMarkT> microbatchSource =
(MicrobatchSource<T, CheckpointMarkT>) source;
@@ -130,18 +136,25 @@ public class StateSpecFunctions {
throw new RuntimeException(e);
}
- // read microbatch.
- final List<WindowedValue<T>> readValues = new ArrayList<>();
+ // read microbatch as a serialized collection.
+ final List<byte[]> readValues = new ArrayList<>();
+ final Instant watermark;
+ WindowedValue.FullWindowedValueCoder<T> coder =
+ WindowedValue.FullWindowedValueCoder.of(
+ source.getDefaultOutputCoder(),
+ GlobalWindow.Coder.INSTANCE);
try {
// measure how long a read takes per-partition.
Stopwatch stopwatch = Stopwatch.createStarted();
boolean finished = !reader.start();
while (!finished) {
- readValues.add(WindowedValue.of(reader.getCurrent(), reader.getCurrentTimestamp(),
- GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+ WindowedValue<T> wv = WindowedValue.of(reader.getCurrent(),
+ reader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ readValues.add(CoderHelpers.toByteArray(wv, coder));
finished = !reader.advance();
}
+ watermark = ((MicrobatchSource.Reader) reader).getWatermark();
// close and checkpoint reader.
reader.close();
LOG.info("Source id {} spent {} msec on reading.", microbatchSource.getId(),
@@ -160,7 +173,13 @@ public class StateSpecFunctions {
throw new RuntimeException("Failed to read from reader.", e);
}
- return Iterators.unmodifiableIterator(readValues.iterator());
+ Iterable <byte[]> iterable = new Iterable<byte[]>() {
+ @Override
+ public Iterator<byte[]> iterator() {
+ return Iterators.unmodifiableIterator(readValues.iterator());
+ }
+ };
+ return new Tuple2<>(iterable, new Metadata(readValues.size(), watermark));
}
};
}
[2/2] beam git commit: [BEAM-1177] This closes #1654
Posted by jb...@apache.org.
[BEAM-1177] This closes #1654
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1d85dfc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1d85dfc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1d85dfc
Branch: refs/heads/master
Commit: d1d85dfc76574e925dbefe9dfbf6edb8bcab8597
Parents: 5c61227 a49fbca
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Wed Jan 4 13:07:26 2017 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Jan 4 13:07:26 2017 +0100
----------------------------------------------------------------------
.../beam/runners/spark/io/MicrobatchSource.java | 4 +-
.../runners/spark/io/SparkUnboundedSource.java | 127 +++++++++++++------
.../spark/stateful/StateSpecFunctions.java | 37 ++++--
3 files changed, 121 insertions(+), 47 deletions(-)
----------------------------------------------------------------------