You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 19:54:46 UTC
[1/2] incubator-beam git commit: [BEAM-1111] Reject timers for ParDo
in SparkRunner streaming evaluators
Repository: incubator-beam
Updated Branches:
refs/heads/master 96f9fce78 -> 0bfa02dd2
[BEAM-1111] Reject timers for ParDo in SparkRunner streaming evaluators
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/95e2c53d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/95e2c53d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/95e2c53d
Branch: refs/heads/master
Commit: 95e2c53db535952aaf0c335e0d3d27a721c6b55d
Parents: 96f9fce
Author: Sela <an...@paypal.com>
Authored: Thu Dec 8 20:29:35 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 11:54:19 2016 -0800
----------------------------------------------------------------------
.../spark/translation/TransformTranslator.java | 28 +----------------
.../spark/translation/TranslationUtils.java | 33 ++++++++++++++++++++
.../streaming/StreamingTransformTranslator.java | 6 ++++
3 files changed, 40 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95e2c53d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 8170366..964eb37 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -23,6 +23,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutput
import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
+import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers;
import com.google.common.collect.Maps;
import java.io.IOException;
@@ -32,7 +33,6 @@ import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.aggregators.SparkAggregators;
import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -58,8 +58,6 @@ import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -228,30 +226,6 @@ public final class TransformTranslator {
};
}
- private static void rejectStateAndTimers(DoFn<?, ?> doFn) {
- DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
-
- if (signature.stateDeclarations().size() > 0) {
- throw new UnsupportedOperationException(
- String.format(
- "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
- DoFn.StateId.class.getSimpleName(),
- doFn.getClass().getName(),
- DoFn.class.getSimpleName(),
- SparkRunner.class.getSimpleName()));
- }
-
- if (signature.timerDeclarations().size() > 0) {
- throw new UnsupportedOperationException(
- String.format(
- "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
- DoFn.TimerId.class.getSimpleName(),
- doFn.getClass().getName(),
- DoFn.class.getSimpleName(),
- SparkRunner.class.getSimpleName()));
- }
- }
-
private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95e2c53d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 647f8c3..eddc771 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -24,8 +24,12 @@ import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -211,4 +215,33 @@ public final class TranslationUtils {
}
}
+ /**
+ * Reject state and timers {@link DoFn}.
+ *
+ * @param doFn the {@link DoFn} to possibly reject.
+ */
+ public static void rejectStateAndTimers(DoFn<?, ?> doFn) {
+ DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+ if (signature.stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ SparkRunner.class.getSimpleName()));
+ }
+
+ if (signature.timerDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+ DoFn.TimerId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ SparkRunner.class.getSimpleName()));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95e2c53d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 85d796a..00df7d4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.spark.translation.streaming;
import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers;
import com.google.common.collect.Maps;
import java.util.ArrayList;
@@ -47,6 +48,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
@@ -348,6 +350,8 @@ final class StreamingTransformTranslator {
@Override
public void evaluate(final ParDo.Bound<InputT, OutputT> transform,
final EvaluationContext context) {
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ rejectStateAndTimers(doFn);
final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(), context);
@@ -380,6 +384,8 @@ final class StreamingTransformTranslator {
@Override
public void evaluate(final ParDo.BoundMulti<InputT, OutputT> transform,
final EvaluationContext context) {
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ rejectStateAndTimers(doFn);
final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(), context);
[2/2] incubator-beam git commit: This closes #1553
Posted by ke...@apache.org.
This closes #1553
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0bfa02dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0bfa02dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0bfa02dd
Branch: refs/heads/master
Commit: 0bfa02dd26a7fb80753da1ed130acff1265d093a
Parents: 96f9fce 95e2c53
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 8 11:54:33 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 11:54:33 2016 -0800
----------------------------------------------------------------------
.../spark/translation/TransformTranslator.java | 28 +----------------
.../spark/translation/TranslationUtils.java | 33 ++++++++++++++++++++
.../streaming/StreamingTransformTranslator.java | 6 ++++
3 files changed, 40 insertions(+), 27 deletions(-)
----------------------------------------------------------------------