You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/23 03:04:58 UTC
[32/50] beam git commit: DataflowRunner: Reject SetState and MapState
DataflowRunner: Reject SetState and MapState
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c0576445
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c0576445
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c0576445
Branch: refs/heads/gearpump-runner
Commit: c05764454e73ab93d0602f34b8b7622d46e1d892
Parents: 3785b5b
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jun 21 20:58:35 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:51:03 2017 -0700
----------------------------------------------------------------------
.../dataflow/BatchStatefulParDoOverrides.java | 2 +
.../dataflow/DataflowPipelineTranslator.java | 2 +
.../beam/runners/dataflow/DataflowRunner.java | 30 +++++++
.../runners/dataflow/DataflowRunnerTest.java | 89 ++++++++++++++++++--
4 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c0576445/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
index 4d9a57f..41202db 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
@@ -145,6 +145,7 @@ public class BatchStatefulParDoOverrides {
public PCollection<OutputT> expand(PCollection<KV<K, InputT>> input) {
DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
verifyFnIsStateful(fn);
+ DataflowRunner.verifyStateSupported(fn);
PTransform<
PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K, InputT>>>>>>,
@@ -169,6 +170,7 @@ public class BatchStatefulParDoOverrides {
public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
verifyFnIsStateful(fn);
+ DataflowRunner.verifyStateSupported(fn);
PTransform<
PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K, InputT>>>>>>,
http://git-wip-us.apache.org/repos/asf/beam/blob/c0576445/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index bfd9b64..6d30544 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -972,6 +972,8 @@ public class DataflowPipelineTranslator {
fn));
}
+ DataflowRunner.verifyStateSupported(fn);
+
stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName());
stepContext.addInput(
PropertyNames.SERIALIZED_FN,
http://git-wip-us.apache.org/repos/asf/beam/blob/c0576445/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 1741287..4d7f6ac 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -107,6 +107,8 @@ import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
import org.apache.beam.sdk.transforms.Create;
@@ -119,6 +121,8 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.InstanceBuilder;
@@ -136,6 +140,7 @@ import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.DateTimeUtils;
@@ -1512,4 +1517,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
return workerHarnessContainerImage.replace("IMAGE", "beam-java-batch");
}
}
+
+ static void verifyStateSupported(DoFn<?, ?> fn) {
+ DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+
+ for (DoFnSignature.StateDeclaration stateDecl : signature.stateDeclarations().values()) {
+
+ // https://issues.apache.org/jira/browse/BEAM-1474
+ if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(MapState.class))) {
+ throw new UnsupportedOperationException(String.format(
+ "%s does not currently support %s",
+ DataflowRunner.class.getSimpleName(),
+ MapState.class.getSimpleName()
+ ));
+ }
+
+ // https://issues.apache.org/jira/browse/BEAM-1479
+ if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(SetState.class))) {
+ throw new UnsupportedOperationException(String.format(
+ "%s does not currently support %s",
+ DataflowRunner.class.getSimpleName(),
+ SetState.class.getSimpleName()
+ ));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0576445/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index aae21cf..f57c0ee 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -50,6 +50,7 @@ import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.Serializable;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.channels.FileChannel;
@@ -82,18 +83,26 @@ import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.SetState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PValue;
@@ -120,7 +129,7 @@ import org.mockito.stubbing.Answer;
* Tests for the {@link DataflowRunner}.
*/
@RunWith(JUnit4.class)
-public class DataflowRunnerTest {
+public class DataflowRunnerTest implements Serializable {
private static final String VALID_STAGING_BUCKET = "gs://valid-bucket/staging";
private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp";
@@ -130,15 +139,12 @@ public class DataflowRunnerTest {
private static final String PROJECT_ID = "some-project";
private static final String REGION_ID = "some-region-1";
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
- @Rule
- public ExpectedException thrown = ExpectedException.none();
- @Rule
- public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class);
+ @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+ @Rule public transient ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class);
- private Dataflow.Projects.Locations.Jobs mockJobs;
- private GcsUtil mockGcsUtil;
+ private transient Dataflow.Projects.Locations.Jobs mockJobs;
+ private transient GcsUtil mockGcsUtil;
// Asserts that the given Job has all expected fields set.
private static void assertValidJob(Job job) {
@@ -1001,6 +1007,71 @@ public class DataflowRunnerTest {
assertTrue(transform.translated);
}
+ private void verifyMapStateUnsupported(PipelineOptions options) throws Exception {
+ Pipeline p = Pipeline.create(options);
+ p.apply(Create.of(KV.of(13, 42)))
+ .apply(
+ ParDo.of(
+ new DoFn<KV<Integer, Integer>, Void>() {
+ @StateId("fizzle")
+ private final StateSpec<MapState<Void, Void>> voidState = StateSpecs.map();
+
+ @ProcessElement
+ public void process() {}
+ }));
+
+ thrown.expectMessage("MapState");
+ thrown.expect(UnsupportedOperationException.class);
+ p.run();
+ }
+
+ @Test
+ public void testMapStateUnsupportedInBatch() throws Exception {
+ PipelineOptions options = buildPipelineOptions();
+ options.as(StreamingOptions.class).setStreaming(false);
+ verifyMapStateUnsupported(options);
+ }
+
+ @Test
+ public void testMapStateUnsupportedInStreaming() throws Exception {
+ PipelineOptions options = buildPipelineOptions();
+ options.as(StreamingOptions.class).setStreaming(true);
+ verifyMapStateUnsupported(options);
+ }
+
+ private void verifySetStateUnsupported(PipelineOptions options) throws Exception {
+ Pipeline p = Pipeline.create(options);
+ p.apply(Create.of(KV.of(13, 42)))
+ .apply(
+ ParDo.of(
+ new DoFn<KV<Integer, Integer>, Void>() {
+ @StateId("fizzle")
+ private final StateSpec<SetState<Void>> voidState = StateSpecs.set();
+
+ @ProcessElement
+ public void process() {}
+ }));
+
+ thrown.expectMessage("SetState");
+ thrown.expect(UnsupportedOperationException.class);
+ p.run();
+ }
+
+ @Test
+ public void testSetStateUnsupportedInBatch() throws Exception {
+ PipelineOptions options = buildPipelineOptions();
+ options.as(StreamingOptions.class).setStreaming(false);
+ Pipeline p = Pipeline.create(options);
+ verifySetStateUnsupported(options);
+ }
+
+ @Test
+ public void testSetStateUnsupportedInStreaming() throws Exception {
+ PipelineOptions options = buildPipelineOptions();
+ options.as(StreamingOptions.class).setStreaming(true);
+ verifySetStateUnsupported(options);
+ }
+
/** Records all the composite transforms visited within the Pipeline. */
private static class CompositeTransformRecorder extends PipelineVisitor.Defaults {
private List<PTransform<?, ?>> transforms = new ArrayList<>();