You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/10 21:58:40 UTC
[15/50] [abbrv] incubator-beam git commit: Only accumulate outputs
from one call to processContext, rather than for the whole partition.
Only accumulate outputs from one call to processContext, rather than
for the whole partition.
Fixes https://github.com/cloudera/spark-dataflow/issues/61.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3cae69be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3cae69be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3cae69be
Branch: refs/heads/master
Commit: 3cae69bea6060c3ccf274a0444a93df3f8bc61b8
Parents: 79b08ad
Author: Tom White <to...@cloudera.com>
Authored: Fri Jul 10 16:41:30 2015 +0100
Committer: Tom White <to...@cloudera.com>
Committed: Thu Mar 10 11:15:15 2016 +0000
----------------------------------------------------------------------
.../cloudera/dataflow/spark/DoFnFunction.java | 22 +++----
.../dataflow/spark/MultiDoFnFunction.java | 36 ++++++------
.../dataflow/spark/SparkProcessContext.java | 60 +++++++++++++++++++-
3 files changed, 90 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3cae69be/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
index c5d7ddc..ae3dd79 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
@@ -51,20 +51,12 @@ class DoFnFunction<I, O> implements FlatMapFunction<Iterator<I>, O> {
@Override
public Iterable<O> call(Iterator<I> iter) throws Exception {
ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
- //setup
mFunction.startBundle(ctxt);
ctxt.setup();
- //operation
- while (iter.hasNext()) {
- ctxt.element = iter.next();
- mFunction.processElement(ctxt);
- }
- //cleanup
- mFunction.finishBundle(ctxt);
- return ctxt.outputs;
+ return ctxt.getOutputIterable(iter, mFunction);
}
- private class ProcCtxt extends SparkProcessContext<I, O> {
+ private class ProcCtxt extends SparkProcessContext<I, O, O> {
private final List<O> outputs = new LinkedList<>();
@@ -77,5 +69,15 @@ class DoFnFunction<I, O> implements FlatMapFunction<Iterator<I>, O> {
public synchronized void output(O o) {
outputs.add(o);
}
+
+ @Override
+ protected void clearOutput() {
+ outputs.clear();
+ }
+
+ protected Iterator<O> getOutputIterator() {
+ return outputs.iterator();
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3cae69be/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
index e6cb664..8a9f8d5 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
@@ -21,7 +21,7 @@ import java.util.Map;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -55,26 +55,12 @@ class MultiDoFnFunction<I, O> implements PairFlatMapFunction<Iterator<I>, TupleT
@Override
public Iterable<Tuple2<TupleTag<?>, Object>> call(Iterator<I> iter) throws Exception {
ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
- //setup
mFunction.startBundle(ctxt);
ctxt.setup();
- //operation
- while (iter.hasNext()) {
- ctxt.element = iter.next();
- mFunction.processElement(ctxt);
- }
- //cleanup
- mFunction.finishBundle(ctxt);
- return Iterables.transform(ctxt.outputs.entries(),
- new Function<Map.Entry<TupleTag<?>, Object>, Tuple2<TupleTag<?>, Object>>() {
- @Override
- public Tuple2<TupleTag<?>, Object> apply(Map.Entry<TupleTag<?>, Object> input) {
- return new Tuple2<TupleTag<?>, Object>(input.getKey(), input.getValue());
- }
- });
+ return ctxt.getOutputIterable(iter, mFunction);
}
- private class ProcCtxt extends SparkProcessContext<I, O> {
+ private class ProcCtxt extends SparkProcessContext<I, O, Tuple2<TupleTag<?>, Object>> {
private final Multimap<TupleTag<?>, Object> outputs = LinkedListMultimap.create();
@@ -97,5 +83,21 @@ class MultiDoFnFunction<I, O> implements PairFlatMapFunction<Iterator<I>, TupleT
public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
outputs.put(tupleTag, t);
}
+
+ @Override
+ protected void clearOutput() {
+ outputs.clear();
+ }
+
+ protected Iterator<Tuple2<TupleTag<?>, Object>> getOutputIterator() {
+ return Iterators.transform(outputs.entries().iterator(),
+ new Function<Map.Entry<TupleTag<?>, Object>, Tuple2<TupleTag<?>, Object>>() {
+ @Override
+ public Tuple2<TupleTag<?>, Object> apply(Map.Entry<TupleTag<?>, Object> input) {
+ return new Tuple2<TupleTag<?>, Object>(input.getKey(), input.getValue());
+ }
+ });
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3cae69be/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
index 12fb4e0..ee51c35 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
@@ -18,6 +18,7 @@ package com.cloudera.dataflow.spark;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -34,11 +35,12 @@ import com.google.cloud.dataflow.sdk.util.WindowingInternals;
import com.google.cloud.dataflow.sdk.values.CodedTupleTag;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.collect.AbstractIterator;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-abstract class SparkProcessContext<I, O> extends DoFn<I, O>.ProcessContext {
+abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class);
@@ -192,4 +194,60 @@ abstract class SparkProcessContext<I, O> extends DoFn<I, O>.ProcessContext {
};
}
+ protected abstract void clearOutput();
+ protected abstract Iterator<V> getOutputIterator();
+
+ protected Iterable<V> getOutputIterable(final Iterator<I> iter, final DoFn<I, O> doFn) {
+ return new Iterable<V>() {
+ @Override
+ public Iterator<V> iterator() {
+ return new ProcCtxtIterator(iter, doFn);
+ }
+ };
+ }
+
+ private class ProcCtxtIterator extends AbstractIterator<V> {
+
+ private final Iterator<I> inputIterator;
+ private final DoFn<I, O> doFn;
+ private Iterator<V> outputIterator;
+
+ public ProcCtxtIterator(Iterator<I> iterator, DoFn<I, O> doFn) {
+ this.inputIterator = iterator;
+ this.doFn = doFn;
+ }
+
+ @Override
+ protected V computeNext() {
+ // Process each element from the (input) iterator, which produces, zero, one or more
+ // output elements (of type V) in the output iterator. Note that the output
+ // collection (and iterator) is reset between each call to processElement, so the
+ // collection only holds the output values for each call to processElement, rather
+ // than for the whole partition (which would use too much memory).
+ while (true) {
+ if (outputIterator != null && outputIterator.hasNext()) {
+ return outputIterator.next();
+ }
+ if (inputIterator.hasNext()) {
+ clearOutput();
+ element = inputIterator.next();
+ try {
+ doFn.processElement(SparkProcessContext.this);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ outputIterator = getOutputIterator();
+ continue; // try to consume outputIterator from start of loop
+ } else {
+ try {
+ doFn.finishBundle(SparkProcessContext.this);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ return endOfData();
+ }
+ }
+ }
+ }
+
}