You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/23 03:04:58 UTC

[32/50] beam git commit: DataflowRunner: Reject SetState and MapState

DataflowRunner: Reject SetState and MapState


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c0576445
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c0576445
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c0576445

Branch: refs/heads/gearpump-runner
Commit: c05764454e73ab93d0602f34b8b7622d46e1d892
Parents: 3785b5b
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jun 21 20:58:35 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:51:03 2017 -0700

----------------------------------------------------------------------
 .../dataflow/BatchStatefulParDoOverrides.java   |  2 +
 .../dataflow/DataflowPipelineTranslator.java    |  2 +
 .../beam/runners/dataflow/DataflowRunner.java   | 30 +++++++
 .../runners/dataflow/DataflowRunnerTest.java    | 89 ++++++++++++++++++--
 4 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c0576445/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
index 4d9a57f..41202db 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
@@ -145,6 +145,7 @@ public class BatchStatefulParDoOverrides {
     public PCollection<OutputT> expand(PCollection<KV<K, InputT>> input) {
       DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
       verifyFnIsStateful(fn);
+      DataflowRunner.verifyStateSupported(fn);
 
       PTransform<
               PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K, InputT>>>>>>,
@@ -169,6 +170,7 @@ public class BatchStatefulParDoOverrides {
     public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
       DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
       verifyFnIsStateful(fn);
+      DataflowRunner.verifyStateSupported(fn);
 
       PTransform<
               PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K, InputT>>>>>>,

http://git-wip-us.apache.org/repos/asf/beam/blob/c0576445/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index bfd9b64..6d30544 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -972,6 +972,8 @@ public class DataflowPipelineTranslator {
               fn));
     }
 
+    DataflowRunner.verifyStateSupported(fn);
+
     stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName());
     stepContext.addInput(
         PropertyNames.SERIALIZED_FN,

http://git-wip-us.apache.org/repos/asf/beam/blob/c0576445/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 1741287..4d7f6ac 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -107,6 +107,8 @@ import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.GroupedValues;
 import org.apache.beam.sdk.transforms.Create;
@@ -119,6 +121,8 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.InstanceBuilder;
@@ -136,6 +140,7 @@ import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.DateTimeUtils;
@@ -1512,4 +1517,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       return workerHarnessContainerImage.replace("IMAGE", "beam-java-batch");
     }
   }
+
+  static void verifyStateSupported(DoFn<?, ?> fn) {
+    DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+
+    for (DoFnSignature.StateDeclaration stateDecl : signature.stateDeclarations().values()) {
+
+      // https://issues.apache.org/jira/browse/BEAM-1474
+      if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(MapState.class))) {
+        throw new UnsupportedOperationException(String.format(
+            "%s does not currently support %s",
+            DataflowRunner.class.getSimpleName(),
+            MapState.class.getSimpleName()
+        ));
+      }
+
+      // https://issues.apache.org/jira/browse/BEAM-1479
+      if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(SetState.class))) {
+        throw new UnsupportedOperationException(String.format(
+            "%s does not currently support %s",
+            DataflowRunner.class.getSimpleName(),
+            SetState.class.getSimpleName()
+        ));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0576445/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index aae21cf..f57c0ee 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -50,6 +50,7 @@ import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.Serializable;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.nio.channels.FileChannel;
@@ -82,18 +83,26 @@ import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.SetState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PValue;
@@ -120,7 +129,7 @@ import org.mockito.stubbing.Answer;
  * Tests for the {@link DataflowRunner}.
  */
 @RunWith(JUnit4.class)
-public class DataflowRunnerTest {
+public class DataflowRunnerTest implements Serializable {
 
   private static final String VALID_STAGING_BUCKET = "gs://valid-bucket/staging";
   private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp";
@@ -130,15 +139,12 @@ public class DataflowRunnerTest {
   private static final String PROJECT_ID = "some-project";
   private static final String REGION_ID = "some-region-1";
 
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-  @Rule
-  public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class);
+  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+  @Rule public transient ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class);
 
-  private Dataflow.Projects.Locations.Jobs mockJobs;
-  private GcsUtil mockGcsUtil;
+  private transient Dataflow.Projects.Locations.Jobs mockJobs;
+  private transient GcsUtil mockGcsUtil;
 
   // Asserts that the given Job has all expected fields set.
   private static void assertValidJob(Job job) {
@@ -1001,6 +1007,71 @@ public class DataflowRunnerTest {
     assertTrue(transform.translated);
   }
 
+  private void verifyMapStateUnsupported(PipelineOptions options) throws Exception {
+    Pipeline p = Pipeline.create(options);
+    p.apply(Create.of(KV.of(13, 42)))
+        .apply(
+            ParDo.of(
+                new DoFn<KV<Integer, Integer>, Void>() {
+                  @StateId("fizzle")
+                  private final StateSpec<MapState<Void, Void>> voidState = StateSpecs.map();
+
+                  @ProcessElement
+                  public void process() {}
+                }));
+
+    thrown.expectMessage("MapState");
+    thrown.expect(UnsupportedOperationException.class);
+    p.run();
+  }
+
+  @Test
+  public void testMapStateUnsupportedInBatch() throws Exception {
+    PipelineOptions options = buildPipelineOptions();
+    options.as(StreamingOptions.class).setStreaming(false);
+    verifyMapStateUnsupported(options);
+  }
+
+  @Test
+  public void testMapStateUnsupportedInStreaming() throws Exception {
+    PipelineOptions options = buildPipelineOptions();
+    options.as(StreamingOptions.class).setStreaming(true);
+    verifyMapStateUnsupported(options);
+  }
+
+  private void verifySetStateUnsupported(PipelineOptions options) throws Exception {
+    Pipeline p = Pipeline.create(options);
+    p.apply(Create.of(KV.of(13, 42)))
+        .apply(
+            ParDo.of(
+                new DoFn<KV<Integer, Integer>, Void>() {
+                  @StateId("fizzle")
+                  private final StateSpec<SetState<Void>> voidState = StateSpecs.set();
+
+                  @ProcessElement
+                  public void process() {}
+                }));
+
+    thrown.expectMessage("SetState");
+    thrown.expect(UnsupportedOperationException.class);
+    p.run();
+  }
+
+  @Test
+  public void testSetStateUnsupportedInBatch() throws Exception {
+    PipelineOptions options = buildPipelineOptions();
+    options.as(StreamingOptions.class).setStreaming(false);
+    Pipeline p = Pipeline.create(options);
+    verifySetStateUnsupported(options);
+  }
+
+  @Test
+  public void testSetStateUnsupportedInStreaming() throws Exception {
+    PipelineOptions options = buildPipelineOptions();
+    options.as(StreamingOptions.class).setStreaming(true);
+    verifySetStateUnsupported(options);
+  }
+
   /** Records all the composite transforms visited within the Pipeline. */
   private static class CompositeTransformRecorder extends PipelineVisitor.Defaults {
     private List<PTransform<?, ?>> transforms = new ArrayList<>();