You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by st...@apache.org on 2017/03/30 11:54:46 UTC

[1/2] beam git commit: Extracted captures to static classes to prevent them from capturing the scope.

Repository: beam
Updated Branches:
  refs/heads/master 769398e40 -> 2a40534e8


Extracted captures to static classes to prevent them from capturing the scope.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3876f83a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3876f83a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3876f83a

Branch: refs/heads/master
Commit: 3876f83a82845e3c0a41152cf7a7c58378d994e7
Parents: 769398e
Author: Stas Levin <st...@apache.org>
Authored: Wed Mar 29 15:29:20 2017 +0300
Committer: Stas Levin <st...@apache.org>
Committed: Thu Mar 30 14:53:38 2017 +0300

----------------------------------------------------------------------
 .../runners/spark/io/SparkUnboundedSource.java  | 37 ++++++++++++--------
 .../spark/stateful/StateSpecFunctions.java      | 17 +++++----
 2 files changed, 31 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3876f83a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index a538907..162bca4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -102,13 +102,7 @@ public class SparkUnboundedSource {
 
     // report the number of input elements for this InputDStream to the InputInfoTracker.
     int id = inputDStream.inputDStream().id();
-    JavaDStream<Metadata> metadataDStream = mapWithStateDStream.map(
-        new Function<Tuple2<Iterable<byte[]>, Metadata>, Metadata>() {
-          @Override
-          public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
-            return t2._2();
-          }
-        });
+    JavaDStream<Metadata> metadataDStream = mapWithStateDStream.map(new Tuple2MetadataFunction());
 
     // register ReadReportDStream to report information related to this read.
     new ReadReportDStream(metadataDStream.dstream(), id, getSourceName(source, id)).register();
@@ -118,13 +112,10 @@ public class SparkUnboundedSource {
         WindowedValue.FullWindowedValueCoder.of(
             source.getDefaultOutputCoder(),
             GlobalWindow.Coder.INSTANCE);
-    JavaDStream<WindowedValue<T>> readUnboundedStream = mapWithStateDStream.flatMap(
-        new FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]>() {
-          @Override
-          public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
-            return t2._1();
-          }
-        }).map(CoderHelpers.fromByteFunction(coder));
+    JavaDStream<WindowedValue<T>> readUnboundedStream =
+        mapWithStateDStream
+            .flatMap(new Tuple2byteFlatMapFunction())
+            .map(CoderHelpers.fromByteFunction(coder));
     return new UnboundedDataset<>(readUnboundedStream, Collections.singletonList(id));
   }
 
@@ -274,4 +265,22 @@ public class SparkUnboundedSource {
       return metricsContainer;
     }
   }
+
+  private static class Tuple2MetadataFunction
+      implements Function<Tuple2<Iterable<byte[]>, Metadata>, Metadata> {
+
+    @Override
+    public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
+      return t2._2();
+    }
+  }
+
+  private static class Tuple2byteFlatMapFunction
+      implements FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]> {
+
+    @Override
+    public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
+      return t2._1();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3876f83a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index ec4fce3..803fe45 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -20,11 +20,11 @@ package org.apache.beam.runners.spark.stateful;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -197,14 +197,13 @@ public class StateSpecFunctions {
           throw new RuntimeException("Failed to read from reader.", e);
         }
 
-          Iterable <byte[]> iterable = new Iterable<byte[]>() {
-            @Override
-            public Iterator<byte[]> iterator() {
-              return Iterators.unmodifiableIterator(readValues.iterator());
-            }
-          };
-          return new Tuple2<>(iterable,
-              new Metadata(readValues.size(), lowWatermark, highWatermark, sparkMetricsContainer));
+        final ArrayList<byte[]> payload =
+            Lists.newArrayList(Iterators.unmodifiableIterator(readValues.iterator()));
+
+        return new Tuple2<>(
+            (Iterable<byte[]>) payload,
+            new Metadata(readValues.size(), lowWatermark, highWatermark, sparkMetricsContainer));
+
         } catch (IOException e) {
           throw new RuntimeException(e);
         }


[2/2] beam git commit: This closes #2356

Posted by st...@apache.org.
This closes #2356


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2a40534e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2a40534e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2a40534e

Branch: refs/heads/master
Commit: 2a40534e80fb84d969ac16bd0d62618109ee04b4
Parents: 769398e 3876f83
Author: Stas Levin <st...@apache.org>
Authored: Thu Mar 30 14:53:57 2017 +0300
Committer: Stas Levin <st...@apache.org>
Committed: Thu Mar 30 14:53:57 2017 +0300

----------------------------------------------------------------------
 .../runners/spark/io/SparkUnboundedSource.java  | 37 ++++++++++++--------
 .../spark/stateful/StateSpecFunctions.java      | 17 +++++----
 2 files changed, 31 insertions(+), 23 deletions(-)
----------------------------------------------------------------------