You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/23 06:51:59 UTC

[03/50] incubator-beam git commit: Refactor SparkProcessContext more cleanly into single- and multi-output versions

Refactor SparkProcessContext more cleanly into single- and multi-output versions


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8243fcdc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8243fcdc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8243fcdc

Branch: refs/heads/gearpump-runner
Commit: 8243fcdc4e80838589622bd2f0bbe51350da8c8a
Parents: 1fb1f7b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Nov 11 16:57:27 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Nov 17 13:18:36 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/core/OutputWindowedValue.java  |  1 -
 .../runners/core/ReduceFnContextFactory.java    |  1 -
 .../runners/spark/translation/DoFnFunction.java |  9 +++-
 .../spark/translation/MultiDoFnFunction.java    | 17 +-----
 .../spark/translation/SparkProcessContext.java  | 57 ++++++++++----------
 5 files changed, 39 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
index 08a0e81..86eeb33 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core;
 import java.util.Collection;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 668ef47..d43fb8e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -33,7 +33,6 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.Timers;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;
 import org.apache.beam.sdk.util.state.ReadableState;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index 4dfbee6..fa08c5b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.FlatMapFunction;
-
+import org.joda.time.Instant;
 
 
 /**
@@ -93,6 +93,13 @@ public class DoFnFunction<InputT, OutputT>
     }
 
     @Override
+    public <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output) {
+      throw new UnsupportedOperationException(
+          "sideOutput is an unsupported operation for doFunctions, use a "
+              + "MultiDoFunction instead.");
+    }
+
+    @Override
     public Accumulator<NamedAggregators> getAccumulator() {
       return accum;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 1168381..d015b08 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -103,21 +103,8 @@ public class MultiDoFnFunction<InputT, OutputT>
     }
 
     @Override
-    public synchronized <T> void sideOutput(TupleTag<T> tag, T t) {
-      sideOutputWithTimestamp(tag, t, windowedValue != null ? windowedValue.getTimestamp() : null);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag,
-                                            final T t,
-                                            final Instant timestamp) {
-      if (windowedValue == null) {
-        // this is start/finishBundle.
-        outputs.put(tupleTag, noElementWindowedValue(t, timestamp, windowFn));
-      } else {
-        outputs.put(tupleTag, WindowedValue.of(t, timestamp, windowedValue.getWindows(),
-            windowedValue.getPane()));
-      }
+    public <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output) {
+      outputs.put(tag, output);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index f3152ba..afbc824 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -127,34 +127,10 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
   }
 
   @Override
-  public void output(OutputT output) {
-    outputWithTimestamp(output, windowedValue != null ? windowedValue.getTimestamp() : null);
-  }
-
-  public abstract void output(WindowedValue<OutputT> output);
-
-  @Override
-  public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
-    String message = "sideOutput is an unsupported operation for doFunctions, use a "
-        + "MultiDoFunction instead.";
-    LOG.warn(message);
-    throw new UnsupportedOperationException(message);
-  }
-
-  @Override
-  public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
-    String message =
-        "sideOutputWithTimestamp is an unsupported operation for doFunctions, use a "
-            + "MultiDoFunction instead.";
-    LOG.warn(message);
-    throw new UnsupportedOperationException(message);
-  }
-
-  @Override
   public <AggregatorInputT, AggregatorOutputT>
   Aggregator<AggregatorInputT, AggregatorOutputT> createAggregatorInternal(
-          String named,
-          Combine.CombineFn<AggregatorInputT, ?, AggregatorOutputT> combineFn) {
+      String named,
+      Combine.CombineFn<AggregatorInputT, ?, AggregatorOutputT> combineFn) {
     return mRuntimeContext.createAggregator(getAccumulator(), named, combineFn);
   }
 
@@ -166,6 +142,11 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
   }
 
   @Override
+  public void output(OutputT output) {
+    outputWithTimestamp(output, windowedValue != null ? windowedValue.getTimestamp() : null);
+  }
+
+  @Override
   public void outputWithTimestamp(OutputT output, Instant timestamp) {
     if (windowedValue == null) {
       // this is start/finishBundle.
@@ -176,6 +157,26 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
     }
   }
 
+  @Override
+  public <T> void sideOutput(TupleTag<T> tag, T output) {
+    sideOutputWithTimestamp(
+        tag, output, windowedValue != null ? windowedValue.getTimestamp() : null);
+  }
+
+  @Override
+  public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+    if (windowedValue == null) {
+      // this is start/finishBundle.
+      sideOutput(tag, noElementWindowedValue(output, timestamp, windowFn));
+    } else {
+      sideOutput(tag, WindowedValue.of(output, timestamp, windowedValue.getWindows(),
+          windowedValue.getPane()));
+    }
+  }
+
+  public abstract void output(WindowedValue<OutputT> output);
+  public abstract <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output);
+
   static <T, W extends BoundedWindow> WindowedValue<T> noElementWindowedValue(
       final T output, final Instant timestamp, WindowFn<Object, W> windowFn) {
     WindowFn<Object, W>.AssignContext assignContext =
@@ -248,8 +249,8 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
       @Override
       public <SideOutputT> void sideOutputWindowedValue(
           TupleTag<SideOutputT> tag, SideOutputT output, Instant timestamp,
-          Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-        throw new UnsupportedOperationException();
+          Collection<? extends BoundedWindow> windows, PaneInfo paneInfo) {
+        sideOutput(tag, WindowedValue.of(output, timestamp, windows, paneInfo));
       }
 
       @Override