You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/10 21:58:40 UTC

[15/50] [abbrv] incubator-beam git commit: Only accumulate outputs from one call to processContext, rather than for the whole partition.

Only accumulate outputs from one call to processContext, rather than
for the whole partition.

Fixes https://github.com/cloudera/spark-dataflow/issues/61.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3cae69be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3cae69be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3cae69be

Branch: refs/heads/master
Commit: 3cae69bea6060c3ccf274a0444a93df3f8bc61b8
Parents: 79b08ad
Author: Tom White <to...@cloudera.com>
Authored: Fri Jul 10 16:41:30 2015 +0100
Committer: Tom White <to...@cloudera.com>
Committed: Thu Mar 10 11:15:15 2016 +0000

----------------------------------------------------------------------
 .../cloudera/dataflow/spark/DoFnFunction.java   | 22 +++----
 .../dataflow/spark/MultiDoFnFunction.java       | 36 ++++++------
 .../dataflow/spark/SparkProcessContext.java     | 60 +++++++++++++++++++-
 3 files changed, 90 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3cae69be/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
index c5d7ddc..ae3dd79 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
@@ -51,20 +51,12 @@ class DoFnFunction<I, O> implements FlatMapFunction<Iterator<I>, O> {
   @Override
   public Iterable<O> call(Iterator<I> iter) throws Exception {
     ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
-    //setup
     mFunction.startBundle(ctxt);
     ctxt.setup();
-    //operation
-    while (iter.hasNext()) {
-      ctxt.element = iter.next();
-      mFunction.processElement(ctxt);
-    }
-    //cleanup
-    mFunction.finishBundle(ctxt);
-    return ctxt.outputs;
+    return ctxt.getOutputIterable(iter, mFunction);
   }
 
-  private class ProcCtxt extends SparkProcessContext<I, O> {
+  private class ProcCtxt extends SparkProcessContext<I, O, O> {
 
     private final List<O> outputs = new LinkedList<>();
 
@@ -77,5 +69,15 @@ class DoFnFunction<I, O> implements FlatMapFunction<Iterator<I>, O> {
     public synchronized void output(O o) {
       outputs.add(o);
     }
+
+    @Override
+    protected void clearOutput() {
+      outputs.clear();
+    }
+
+    protected Iterator<O> getOutputIterator() {
+      return outputs.iterator();
+    }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3cae69be/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
index e6cb664..8a9f8d5 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
@@ -21,7 +21,7 @@ import java.util.Map;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.values.TupleTag;
 import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Multimap;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -55,26 +55,12 @@ class MultiDoFnFunction<I, O> implements PairFlatMapFunction<Iterator<I>, TupleT
   @Override
   public Iterable<Tuple2<TupleTag<?>, Object>> call(Iterator<I> iter) throws Exception {
     ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
-    //setup
     mFunction.startBundle(ctxt);
     ctxt.setup();
-    //operation
-    while (iter.hasNext()) {
-      ctxt.element = iter.next();
-      mFunction.processElement(ctxt);
-    }
-    //cleanup
-    mFunction.finishBundle(ctxt);
-    return Iterables.transform(ctxt.outputs.entries(),
-        new Function<Map.Entry<TupleTag<?>, Object>, Tuple2<TupleTag<?>, Object>>() {
-          @Override
-          public Tuple2<TupleTag<?>, Object> apply(Map.Entry<TupleTag<?>, Object> input) {
-            return new Tuple2<TupleTag<?>, Object>(input.getKey(), input.getValue());
-          }
-        });
+    return ctxt.getOutputIterable(iter, mFunction);
   }
 
-  private class ProcCtxt extends SparkProcessContext<I, O> {
+  private class ProcCtxt extends SparkProcessContext<I, O, Tuple2<TupleTag<?>, Object>> {
 
     private final Multimap<TupleTag<?>, Object> outputs = LinkedListMultimap.create();
 
@@ -97,5 +83,21 @@ class MultiDoFnFunction<I, O> implements PairFlatMapFunction<Iterator<I>, TupleT
     public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
       outputs.put(tupleTag, t);
     }
+
+    @Override
+    protected void clearOutput() {
+      outputs.clear();
+    }
+
+    protected Iterator<Tuple2<TupleTag<?>, Object>> getOutputIterator() {
+      return Iterators.transform(outputs.entries().iterator(),
+          new Function<Map.Entry<TupleTag<?>, Object>, Tuple2<TupleTag<?>, Object>>() {
+        @Override
+        public Tuple2<TupleTag<?>, Object> apply(Map.Entry<TupleTag<?>, Object> input) {
+          return new Tuple2<TupleTag<?>, Object>(input.getKey(), input.getValue());
+        }
+      });
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3cae69be/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
index 12fb4e0..ee51c35 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
@@ -18,6 +18,7 @@ package com.cloudera.dataflow.spark;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -34,11 +35,12 @@ import com.google.cloud.dataflow.sdk.util.WindowingInternals;
 import com.google.cloud.dataflow.sdk.values.CodedTupleTag;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
 import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.collect.AbstractIterator;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-abstract class SparkProcessContext<I, O> extends DoFn<I, O>.ProcessContext {
+abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class);
 
@@ -192,4 +194,60 @@ abstract class SparkProcessContext<I, O> extends DoFn<I, O>.ProcessContext {
     };
   }
 
+  protected abstract void clearOutput();
+  protected abstract Iterator<V> getOutputIterator();
+
+  protected Iterable<V> getOutputIterable(final Iterator<I> iter, final DoFn<I, O> doFn) {
+    return new Iterable<V>() {
+      @Override
+      public Iterator<V> iterator() {
+        return new ProcCtxtIterator(iter, doFn);
+      }
+    };
+  }
+
+  private class ProcCtxtIterator extends AbstractIterator<V> {
+
+    private final Iterator<I> inputIterator;
+    private final DoFn<I, O> doFn;
+    private Iterator<V> outputIterator;
+
+    public ProcCtxtIterator(Iterator<I> iterator, DoFn<I, O> doFn) {
+      this.inputIterator = iterator;
+      this.doFn = doFn;
+    }
+
+    @Override
+    protected V computeNext() {
+      // Process each element from the (input) iterator, which produces, zero, one or more
+      // output elements (of type V) in the output iterator. Note that the output
+      // collection (and iterator) is reset between each call to processElement, so the
+      // collection only holds the output values for each call to processElement, rather
+      // than for the whole partition (which would use too much memory).
+      while (true) {
+        if (outputIterator != null && outputIterator.hasNext()) {
+          return outputIterator.next();
+        }
+        if (inputIterator.hasNext()) {
+          clearOutput();
+          element = inputIterator.next();
+          try {
+            doFn.processElement(SparkProcessContext.this);
+          } catch (Exception e) {
+            throw new IllegalStateException(e);
+          }
+          outputIterator = getOutputIterator();
+          continue; // try to consume outputIterator from start of loop
+        } else {
+          try {
+            doFn.finishBundle(SparkProcessContext.this);
+          } catch (Exception e) {
+            throw new IllegalStateException(e);
+          }
+          return endOfData();
+        }
+      }
+    }
+  }
+
 }