You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 19:54:46 UTC

[1/2] incubator-beam git commit: [BEAM-1111] Reject timers for ParDo in SparkRunner streaming evaluators

Repository: incubator-beam
Updated Branches:
  refs/heads/master 96f9fce78 -> 0bfa02dd2


[BEAM-1111] Reject timers for ParDo in SparkRunner streaming evaluators


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/95e2c53d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/95e2c53d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/95e2c53d

Branch: refs/heads/master
Commit: 95e2c53db535952aaf0c335e0d3d27a721c6b55d
Parents: 96f9fce
Author: Sela <an...@paypal.com>
Authored: Thu Dec 8 20:29:35 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 11:54:19 2016 -0800

----------------------------------------------------------------------
 .../spark/translation/TransformTranslator.java  | 28 +----------------
 .../spark/translation/TranslationUtils.java     | 33 ++++++++++++++++++++
 .../streaming/StreamingTransformTranslator.java |  6 ++++
 3 files changed, 40 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95e2c53d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 8170366..964eb37 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -23,6 +23,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutput
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
+import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers;
 
 import com.google.common.collect.Maps;
 import java.io.IOException;
@@ -32,7 +33,6 @@ import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.avro.mapreduce.AvroKeyInputFormat;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -58,8 +58,6 @@ import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -228,30 +226,6 @@ public final class TransformTranslator {
     };
   }
 
-  private static void rejectStateAndTimers(DoFn<?, ?> doFn) {
-    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
-
-    if (signature.stateDeclarations().size() > 0) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
-              DoFn.StateId.class.getSimpleName(),
-              doFn.getClass().getName(),
-              DoFn.class.getSimpleName(),
-              SparkRunner.class.getSimpleName()));
-    }
-
-    if (signature.timerDeclarations().size() > 0) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
-              DoFn.TimerId.class.getSimpleName(),
-              doFn.getClass().getName(),
-              DoFn.class.getSimpleName(),
-              SparkRunner.class.getSimpleName()));
-    }
-  }
-
   private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
     return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95e2c53d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 647f8c3..eddc771 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -24,8 +24,12 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -211,4 +215,33 @@ public final class TranslationUtils {
     }
   }
 
+  /**
+   * Reject state and timers {@link DoFn}.
+   *
+   * @param doFn the {@link DoFn} to possibly reject.
+   */
+  public static void rejectStateAndTimers(DoFn<?, ?> doFn) {
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+    if (signature.stateDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+              DoFn.StateId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              SparkRunner.class.getSimpleName()));
+    }
+
+    if (signature.timerDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+              DoFn.TimerId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              SparkRunner.class.getSimpleName()));
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95e2c53d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 85d796a..00df7d4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.spark.translation.streaming;
 
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers;
 
 import com.google.common.collect.Maps;
 import java.util.ArrayList;
@@ -47,6 +48,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
@@ -348,6 +350,8 @@ final class StreamingTransformTranslator {
       @Override
       public void evaluate(final ParDo.Bound<InputT, OutputT> transform,
                            final EvaluationContext context) {
+        DoFn<InputT, OutputT> doFn = transform.getNewFn();
+        rejectStateAndTimers(doFn);
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
@@ -380,6 +384,8 @@ final class StreamingTransformTranslator {
       @Override
       public void evaluate(final ParDo.BoundMulti<InputT, OutputT> transform,
                            final EvaluationContext context) {
+        DoFn<InputT, OutputT> doFn = transform.getNewFn();
+        rejectStateAndTimers(doFn);
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);


[2/2] incubator-beam git commit: This closes #1553

Posted by ke...@apache.org.
This closes #1553


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0bfa02dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0bfa02dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0bfa02dd

Branch: refs/heads/master
Commit: 0bfa02dd26a7fb80753da1ed130acff1265d093a
Parents: 96f9fce 95e2c53
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 8 11:54:33 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 11:54:33 2016 -0800

----------------------------------------------------------------------
 .../spark/translation/TransformTranslator.java  | 28 +----------------
 .../spark/translation/TranslationUtils.java     | 33 ++++++++++++++++++++
 .../streaming/StreamingTransformTranslator.java |  6 ++++
 3 files changed, 40 insertions(+), 27 deletions(-)
----------------------------------------------------------------------