You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by st...@apache.org on 2017/03/30 11:54:46 UTC
[1/2] beam git commit: Extracted captures to static classes to
prevent them from capturing the scope.
Repository: beam
Updated Branches:
refs/heads/master 769398e40 -> 2a40534e8
Extracted captures to static classes to prevent them from capturing the scope.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3876f83a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3876f83a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3876f83a
Branch: refs/heads/master
Commit: 3876f83a82845e3c0a41152cf7a7c58378d994e7
Parents: 769398e
Author: Stas Levin <st...@apache.org>
Authored: Wed Mar 29 15:29:20 2017 +0300
Committer: Stas Levin <st...@apache.org>
Committed: Thu Mar 30 14:53:38 2017 +0300
----------------------------------------------------------------------
.../runners/spark/io/SparkUnboundedSource.java | 37 ++++++++++++--------
.../spark/stateful/StateSpecFunctions.java | 17 +++++----
2 files changed, 31 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3876f83a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index a538907..162bca4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -102,13 +102,7 @@ public class SparkUnboundedSource {
// report the number of input elements for this InputDStream to the InputInfoTracker.
int id = inputDStream.inputDStream().id();
- JavaDStream<Metadata> metadataDStream = mapWithStateDStream.map(
- new Function<Tuple2<Iterable<byte[]>, Metadata>, Metadata>() {
- @Override
- public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
- return t2._2();
- }
- });
+ JavaDStream<Metadata> metadataDStream = mapWithStateDStream.map(new Tuple2MetadataFunction());
// register ReadReportDStream to report information related to this read.
new ReadReportDStream(metadataDStream.dstream(), id, getSourceName(source, id)).register();
@@ -118,13 +112,10 @@ public class SparkUnboundedSource {
WindowedValue.FullWindowedValueCoder.of(
source.getDefaultOutputCoder(),
GlobalWindow.Coder.INSTANCE);
- JavaDStream<WindowedValue<T>> readUnboundedStream = mapWithStateDStream.flatMap(
- new FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]>() {
- @Override
- public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
- return t2._1();
- }
- }).map(CoderHelpers.fromByteFunction(coder));
+ JavaDStream<WindowedValue<T>> readUnboundedStream =
+ mapWithStateDStream
+ .flatMap(new Tuple2byteFlatMapFunction())
+ .map(CoderHelpers.fromByteFunction(coder));
return new UnboundedDataset<>(readUnboundedStream, Collections.singletonList(id));
}
@@ -274,4 +265,22 @@ public class SparkUnboundedSource {
return metricsContainer;
}
}
+
+ private static class Tuple2MetadataFunction
+ implements Function<Tuple2<Iterable<byte[]>, Metadata>, Metadata> {
+
+ @Override
+ public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
+ return t2._2();
+ }
+ }
+
+ private static class Tuple2byteFlatMapFunction
+ implements FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]> {
+
+ @Override
+ public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
+ return t2._1();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3876f83a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index ec4fce3..803fe45 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -20,11 +20,11 @@ package org.apache.beam.runners.spark.stateful;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -197,14 +197,13 @@ public class StateSpecFunctions {
throw new RuntimeException("Failed to read from reader.", e);
}
- Iterable <byte[]> iterable = new Iterable<byte[]>() {
- @Override
- public Iterator<byte[]> iterator() {
- return Iterators.unmodifiableIterator(readValues.iterator());
- }
- };
- return new Tuple2<>(iterable,
- new Metadata(readValues.size(), lowWatermark, highWatermark, sparkMetricsContainer));
+ final ArrayList<byte[]> payload =
+ Lists.newArrayList(Iterators.unmodifiableIterator(readValues.iterator()));
+
+ return new Tuple2<>(
+ (Iterable<byte[]>) payload,
+ new Metadata(readValues.size(), lowWatermark, highWatermark, sparkMetricsContainer));
+
} catch (IOException e) {
throw new RuntimeException(e);
}
[2/2] beam git commit: This closes #2356
Posted by st...@apache.org.
This closes #2356
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2a40534e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2a40534e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2a40534e
Branch: refs/heads/master
Commit: 2a40534e80fb84d969ac16bd0d62618109ee04b4
Parents: 769398e 3876f83
Author: Stas Levin <st...@apache.org>
Authored: Thu Mar 30 14:53:57 2017 +0300
Committer: Stas Levin <st...@apache.org>
Committed: Thu Mar 30 14:53:57 2017 +0300
----------------------------------------------------------------------
.../runners/spark/io/SparkUnboundedSource.java | 37 ++++++++++++--------
.../spark/stateful/StateSpecFunctions.java | 17 +++++----
2 files changed, 31 insertions(+), 23 deletions(-)
----------------------------------------------------------------------