You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/22 20:11:18 UTC
[14/50] incubator-beam git commit: Refactor SparkProcessContext more
cleanly into single- and multi-output versions
Refactor SparkProcessContext more cleanly into single- and multi-output versions
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8243fcdc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8243fcdc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8243fcdc
Branch: refs/heads/python-sdk
Commit: 8243fcdc4e80838589622bd2f0bbe51350da8c8a
Parents: 1fb1f7b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Nov 11 16:57:27 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Nov 17 13:18:36 2016 -0800
----------------------------------------------------------------------
.../beam/runners/core/OutputWindowedValue.java | 1 -
.../runners/core/ReduceFnContextFactory.java | 1 -
.../runners/spark/translation/DoFnFunction.java | 9 +++-
.../spark/translation/MultiDoFnFunction.java | 17 +-----
.../spark/translation/SparkProcessContext.java | 57 ++++++++++----------
5 files changed, 39 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
index 08a0e81..86eeb33 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core;
import java.util.Collection;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 668ef47..d43fb8e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -33,7 +33,6 @@ import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.Timers;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.MergingStateAccessor;
import org.apache.beam.sdk.util.state.ReadableState;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index 4dfbee6..fa08c5b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.function.FlatMapFunction;
-
+import org.joda.time.Instant;
/**
@@ -93,6 +93,13 @@ public class DoFnFunction<InputT, OutputT>
}
@Override
+ public <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output) {
+ throw new UnsupportedOperationException(
+ "sideOutput is an unsupported operation for doFunctions, use a "
+ + "MultiDoFunction instead.");
+ }
+
+ @Override
public Accumulator<NamedAggregators> getAccumulator() {
return accum;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 1168381..d015b08 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -103,21 +103,8 @@ public class MultiDoFnFunction<InputT, OutputT>
}
@Override
- public synchronized <T> void sideOutput(TupleTag<T> tag, T t) {
- sideOutputWithTimestamp(tag, t, windowedValue != null ? windowedValue.getTimestamp() : null);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag,
- final T t,
- final Instant timestamp) {
- if (windowedValue == null) {
- // this is start/finishBundle.
- outputs.put(tupleTag, noElementWindowedValue(t, timestamp, windowFn));
- } else {
- outputs.put(tupleTag, WindowedValue.of(t, timestamp, windowedValue.getWindows(),
- windowedValue.getPane()));
- }
+ public <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output) {
+ outputs.put(tag, output);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index f3152ba..afbc824 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -127,34 +127,10 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
}
@Override
- public void output(OutputT output) {
- outputWithTimestamp(output, windowedValue != null ? windowedValue.getTimestamp() : null);
- }
-
- public abstract void output(WindowedValue<OutputT> output);
-
- @Override
- public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
- String message = "sideOutput is an unsupported operation for doFunctions, use a "
- + "MultiDoFunction instead.";
- LOG.warn(message);
- throw new UnsupportedOperationException(message);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
- String message =
- "sideOutputWithTimestamp is an unsupported operation for doFunctions, use a "
- + "MultiDoFunction instead.";
- LOG.warn(message);
- throw new UnsupportedOperationException(message);
- }
-
- @Override
public <AggregatorInputT, AggregatorOutputT>
Aggregator<AggregatorInputT, AggregatorOutputT> createAggregatorInternal(
- String named,
- Combine.CombineFn<AggregatorInputT, ?, AggregatorOutputT> combineFn) {
+ String named,
+ Combine.CombineFn<AggregatorInputT, ?, AggregatorOutputT> combineFn) {
return mRuntimeContext.createAggregator(getAccumulator(), named, combineFn);
}
@@ -166,6 +142,11 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
}
@Override
+ public void output(OutputT output) {
+ outputWithTimestamp(output, windowedValue != null ? windowedValue.getTimestamp() : null);
+ }
+
+ @Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
if (windowedValue == null) {
// this is start/finishBundle.
@@ -176,6 +157,26 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
}
}
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ sideOutputWithTimestamp(
+ tag, output, windowedValue != null ? windowedValue.getTimestamp() : null);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ if (windowedValue == null) {
+ // this is start/finishBundle.
+ sideOutput(tag, noElementWindowedValue(output, timestamp, windowFn));
+ } else {
+ sideOutput(tag, WindowedValue.of(output, timestamp, windowedValue.getWindows(),
+ windowedValue.getPane()));
+ }
+ }
+
+ public abstract void output(WindowedValue<OutputT> output);
+ public abstract <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output);
+
static <T, W extends BoundedWindow> WindowedValue<T> noElementWindowedValue(
final T output, final Instant timestamp, WindowFn<Object, W> windowFn) {
WindowFn<Object, W>.AssignContext assignContext =
@@ -248,8 +249,8 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
@Override
public <SideOutputT> void sideOutputWindowedValue(
TupleTag<SideOutputT> tag, SideOutputT output, Instant timestamp,
- Collection<? extends BoundedWindow> windows, PaneInfo pane) {
- throw new UnsupportedOperationException();
+ Collection<? extends BoundedWindow> windows, PaneInfo paneInfo) {
+ sideOutput(tag, WindowedValue.of(output, timestamp, windows, paneInfo));
}
@Override