You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/26 17:47:44 UTC

[1/7] beam git commit: Make Java serialized CombineFn URN public

Repository: beam
Updated Branches:
  refs/heads/master 6bb204f34 -> c687887d2


Make Java serialized CombineFn URN public


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a250ce58
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a250ce58
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a250ce58

Branch: refs/heads/master
Commit: a250ce58c6a0caf473842c4e5e6f980a828dde55
Parents: 8fc2eb0
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 22:51:18 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri May 26 10:22:37 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/core/construction/CombineTranslation.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a250ce58/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index 855fba7..28bc9a1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -49,7 +49,7 @@ import org.apache.beam.sdk.values.PCollection;
  * RunnerApi.CombinePayload} protos.
  */
 public class CombineTranslation {
-  private static final String JAVA_SERIALIZED_COMBINE_FN_URN = "urn:beam:java:combinefn:v1";
+  public static final String JAVA_SERIALIZED_COMBINE_FN_URN = "urn:beam:java:combinefn:v1";
 
   public static CombinePayload toProto(
       AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>> combine, SdkComponents sdkComponents)


[7/7] beam git commit: This closes #3233: [BEAM-115] Runner API Translations for StateSpec and TimerSpec

Posted by ke...@apache.org.
This closes #3233: [BEAM-115] Runner API Translations for StateSpec and TimerSpec

  Implement TimerSpec and StateSpec translation
  Make Java serialized CombineFn URN public
  Add case dispatch to StateSpec
  Flesh out TimerSpec and StateSpec in Runner API
  Allow translation to throw IOException
  Mark CombineFnWithContext StateSpecs internal


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c687887d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c687887d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c687887d

Branch: refs/heads/master
Commit: c687887d2c1313f8c6c1a6d3b0b17a6808a5f134
Parents: 6bb204f 39220db
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 10:36:50 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri May 26 10:36:50 2017 -0700

----------------------------------------------------------------------
 .../core/construction/CombineTranslation.java   |   4 +-
 .../construction/PTransformTranslation.java     |   3 +-
 .../core/construction/ParDoTranslation.java     | 215 ++++++++++++++++--
 .../core/construction/ParDoTranslationTest.java | 218 ++++++++++++-------
 .../src/main/proto/beam_runner_api.proto        |  40 +++-
 .../org/apache/beam/sdk/state/StateSpec.java    |  53 +++++
 .../org/apache/beam/sdk/state/StateSpecs.java   |  53 ++++-
 7 files changed, 478 insertions(+), 108 deletions(-)
----------------------------------------------------------------------



[5/7] beam git commit: Flesh out TimerSpec and StateSpec in Runner API

Posted by ke...@apache.org.
Flesh out TimerSpec and StateSpec in Runner API


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9497e5ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9497e5ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9497e5ea

Branch: refs/heads/master
Commit: 9497e5eaecf5d7eb7f18709935c183b03116f75f
Parents: b0dc523
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 07:12:08 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri May 26 10:22:37 2017 -0700

----------------------------------------------------------------------
 .../src/main/proto/beam_runner_api.proto        | 40 ++++++++++++++------
 1 file changed, 29 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9497e5ea/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index c8722e6..1612209 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -247,21 +247,39 @@ message Parameter {
 }
 
 message StateSpec {
-  // TODO: AST for state spec
-  string id = 1;
-  Type type = 2;
-
-  enum Type {
-    VALUE = 0;
-    BAG = 1;
-    MAP = 2;
-    SET = 3;
+  oneof spec {
+    ValueStateSpec value_spec = 1;
+    BagStateSpec bag_spec = 2;
+    CombiningStateSpec combining_spec = 3;
+    MapStateSpec map_spec = 4;
+    SetStateSpec set_spec = 5;
   }
 }
 
+message ValueStateSpec {
+  string coder_id = 1;
+}
+
+message BagStateSpec {
+  string element_coder_id = 1;
+}
+
+message CombiningStateSpec {
+  string accumulator_coder_id = 1;
+  SdkFunctionSpec combine_fn = 2;
+}
+
+message MapStateSpec {
+  string key_coder_id = 1;
+  string value_coder_id = 2;
+}
+
+message SetStateSpec {
+  string element_coder_id = 1;
+}
+
 message TimerSpec {
-  // TODO: AST for timer spec
-  string id = 1;
+  TimeDomain time_domain = 1;
 }
 
 enum IsBounded {


[6/7] beam git commit: Implement TimerSpec and StateSpec translation

Posted by ke...@apache.org.
Implement TimerSpec and StateSpec translation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/39220dbc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/39220dbc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/39220dbc

Branch: refs/heads/master
Commit: 39220dbca944a2496587c543de2a4eb01004bd76
Parents: a250ce5
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 07:12:29 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri May 26 10:36:35 2017 -0700

----------------------------------------------------------------------
 .../core/construction/CombineTranslation.java   |   2 +-
 .../core/construction/ParDoTranslation.java     | 215 ++++++++++++++++--
 .../core/construction/ParDoTranslationTest.java | 218 ++++++++++++-------
 3 files changed, 343 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/39220dbc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index 28bc9a1..472b6f8 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -86,7 +86,7 @@ public class CombineTranslation {
         .getAccumulatorCoder();
   }
 
-  private static SdkFunctionSpec toProto(GlobalCombineFn<?, ?, ?> combineFn) {
+  public static SdkFunctionSpec toProto(GlobalCombineFn<?, ?, ?> combineFn) {
     return SdkFunctionSpec.newBuilder()
         // TODO: Set Java SDK Environment URN
         .setSpec(

http://git-wip-us.apache.org/repos/asf/beam/blob/39220dbc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 1c81f8c..fe66179 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -19,10 +19,12 @@
 package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
 
 import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -46,9 +48,12 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.Parameter.Type;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput.Builder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.StateSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.TimerSpec;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -107,7 +112,8 @@ public class ParDoTranslation {
 
     @Override
     public FunctionSpec translate(
-        AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents components) {
+        AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents components)
+        throws IOException {
       ParDoPayload payload = toProto(transform.getTransform(), components);
       return RunnerApi.FunctionSpec.newBuilder()
           .setUrn(PAR_DO_TRANSFORM_URN)
@@ -128,8 +134,10 @@ public class ParDoTranslation {
     }
   }
 
-  public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components) {
-    DoFnSignature signature = DoFnSignatures.getSignature(parDo.getFn().getClass());
+  public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components)
+  throws IOException {
+    DoFn<?, ?> doFn = parDo.getFn();
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
     Map<String, StateDeclaration> states = signature.stateDeclarations();
     Map<String, TimerDeclaration> timers = signature.timerDeclarations();
     List<Parameter> parameters = signature.processElement().extraParameters();
@@ -146,16 +154,62 @@ public class ParDoTranslation {
       }
     }
     for (Map.Entry<String, StateDeclaration> state : states.entrySet()) {
-      StateSpec spec = toProto(state.getValue());
+      RunnerApi.StateSpec spec =
+          toProto(getStateSpecOrCrash(state.getValue(), doFn), components);
       builder.putStateSpecs(state.getKey(), spec);
     }
     for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) {
-      TimerSpec spec = toProto(timer.getValue());
+      RunnerApi.TimerSpec spec =
+          toProto(getTimerSpecOrCrash(timer.getValue(), doFn));
       builder.putTimerSpecs(timer.getKey(), spec);
     }
     return builder.build();
   }
 
+  private static StateSpec<?> getStateSpecOrCrash(
+      StateDeclaration stateDeclaration, DoFn<?, ?> target) {
+    try {
+      Object fieldValue = stateDeclaration.field().get(target);
+      checkState(fieldValue instanceof StateSpec,
+          "Malformed %s class %s: state declaration field %s does not have type %s.",
+          DoFn.class.getSimpleName(),
+          target.getClass().getName(),
+          stateDeclaration.field().getName(),
+          StateSpec.class);
+
+      return (StateSpec<?>) stateDeclaration.field().get(target);
+    } catch (IllegalAccessException exc) {
+      throw new RuntimeException(
+          String.format(
+              "Malformed %s class %s: state declaration field %s is not accessible.",
+              DoFn.class.getSimpleName(),
+              target.getClass().getName(),
+              stateDeclaration.field().getName()));
+    }
+  }
+
+  private static TimerSpec getTimerSpecOrCrash(
+      TimerDeclaration timerDeclaration, DoFn<?, ?> target) {
+    try {
+      Object fieldValue = timerDeclaration.field().get(target);
+      checkState(fieldValue instanceof TimerSpec,
+          "Malformed %s class %s: timer declaration field %s does not have type %s.",
+          DoFn.class.getSimpleName(),
+          target.getClass().getName(),
+          timerDeclaration.field().getName(),
+          TimerSpec.class);
+
+      return (TimerSpec) timerDeclaration.field().get(target);
+    } catch (IllegalAccessException exc) {
+      throw new RuntimeException(
+          String.format(
+              "Malformed %s class %s: timer declaration field %s is not accessible.",
+              DoFn.class.getSimpleName(),
+              target.getClass().getName(),
+              timerDeclaration.field().getName()));
+    }
+  }
+
   public static DoFn<?, ?> getDoFn(ParDoPayload payload) throws InvalidProtocolBufferException {
     return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn();
   }
@@ -179,14 +233,149 @@ public class ParDoTranslation {
     return components.getPcollectionsOrThrow(ptransform.getInputsOrThrow(mainInputId));
   }
 
-  // TODO: Implement
-  private static StateSpec toProto(StateDeclaration state) {
-    throw new UnsupportedOperationException("Not yet supported");
+  @VisibleForTesting
+  static RunnerApi.StateSpec toProto(StateSpec<?> stateSpec, final SdkComponents components)
+      throws IOException {
+    final RunnerApi.StateSpec.Builder builder = RunnerApi.StateSpec.newBuilder();
+
+    return stateSpec.match(
+        new StateSpec.Cases<RunnerApi.StateSpec>() {
+          @Override
+          public RunnerApi.StateSpec dispatchValue(Coder<?> valueCoder) {
+            return builder
+                .setValueSpec(
+                    RunnerApi.ValueStateSpec.newBuilder()
+                        .setCoderId(registerCoderOrThrow(components, valueCoder)))
+                .build();
+          }
+
+          @Override
+          public RunnerApi.StateSpec dispatchBag(Coder<?> elementCoder) {
+            return builder
+                .setBagSpec(
+                    RunnerApi.BagStateSpec.newBuilder()
+                        .setElementCoderId(registerCoderOrThrow(components, elementCoder)))
+                .build();
+          }
+
+          @Override
+          public RunnerApi.StateSpec dispatchCombining(
+              Combine.CombineFn<?, ?, ?> combineFn, Coder<?> accumCoder) {
+            return builder
+                .setCombiningSpec(
+                    RunnerApi.CombiningStateSpec.newBuilder()
+                        .setAccumulatorCoderId(registerCoderOrThrow(components, accumCoder))
+                        .setCombineFn(CombineTranslation.toProto(combineFn)))
+                .build();
+          }
+
+          @Override
+          public RunnerApi.StateSpec dispatchMap(Coder<?> keyCoder, Coder<?> valueCoder) {
+            return builder
+                .setMapSpec(
+                    RunnerApi.MapStateSpec.newBuilder()
+                        .setKeyCoderId(registerCoderOrThrow(components, keyCoder))
+                        .setValueCoderId(registerCoderOrThrow(components, valueCoder)))
+                .build();
+          }
+
+          @Override
+          public RunnerApi.StateSpec dispatchSet(Coder<?> elementCoder) {
+            return builder
+                .setSetSpec(
+                    RunnerApi.SetStateSpec.newBuilder()
+                        .setElementCoderId(registerCoderOrThrow(components, elementCoder)))
+                .build();
+          }
+        });
+  }
+
+  @VisibleForTesting
+  static StateSpec<?> fromProto(RunnerApi.StateSpec stateSpec, RunnerApi.Components components)
+      throws IOException {
+    switch (stateSpec.getSpecCase()) {
+      case VALUE_SPEC:
+        return StateSpecs.value(
+            CoderTranslation.fromProto(
+                components.getCodersMap().get(stateSpec.getValueSpec().getCoderId()), components));
+      case BAG_SPEC:
+        return StateSpecs.bag(
+            CoderTranslation.fromProto(
+                components.getCodersMap().get(stateSpec.getBagSpec().getElementCoderId()),
+                components));
+      case COMBINING_SPEC:
+        FunctionSpec combineFnSpec = stateSpec.getCombiningSpec().getCombineFn().getSpec();
+
+        if (!combineFnSpec.getUrn().equals(CombineTranslation.JAVA_SERIALIZED_COMBINE_FN_URN)) {
+          throw new UnsupportedOperationException(
+              String.format(
+                  "Cannot create %s from non-Java %s: %s",
+                  StateSpec.class.getSimpleName(),
+                  Combine.CombineFn.class.getSimpleName(),
+                  combineFnSpec.getUrn()));
+        }
+
+        Combine.CombineFn<?, ?, ?> combineFn =
+            (Combine.CombineFn<?, ?, ?>)
+                SerializableUtils.deserializeFromByteArray(
+                    combineFnSpec.getParameter().unpack(BytesValue.class).toByteArray(),
+                    Combine.CombineFn.class.getSimpleName());
+
+        // Rawtype coder cast because it is required to be a valid accumulator coder
+        // for the CombineFn, by construction
+        return StateSpecs.combining(
+            (Coder)
+                CoderTranslation.fromProto(
+                    components
+                        .getCodersMap()
+                        .get(stateSpec.getCombiningSpec().getAccumulatorCoderId()),
+                    components),
+            combineFn);
+
+      case MAP_SPEC:
+        return StateSpecs.map(
+            CoderTranslation.fromProto(
+                components.getCodersOrThrow(stateSpec.getMapSpec().getKeyCoderId()), components),
+            CoderTranslation.fromProto(
+                components.getCodersOrThrow(stateSpec.getMapSpec().getValueCoderId()), components));
+
+      case SET_SPEC:
+        return StateSpecs.set(
+            CoderTranslation.fromProto(
+                components.getCodersMap().get(stateSpec.getSetSpec().getElementCoderId()),
+                components));
+
+      case SPEC_NOT_SET:
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown %s: %s", RunnerApi.StateSpec.class.getName(), stateSpec));
+
+    }
+  }
+
+  private static String registerCoderOrThrow(SdkComponents components, Coder coder) {
+    try {
+      return components.registerCoder(coder);
+    } catch (IOException exc) {
+      throw new RuntimeException("Failure to register coder", exc);
+    }
   }
 
-  // TODO: Implement
-  private static TimerSpec toProto(TimerDeclaration timer) {
-    throw new UnsupportedOperationException("Not yet supported");
+  private static RunnerApi.TimerSpec toProto(TimerSpec timer) {
+    return RunnerApi.TimerSpec.newBuilder().setTimeDomain(toProto(timer.getTimeDomain())).build();
+  }
+
+  private static RunnerApi.TimeDomain toProto(TimeDomain timeDomain) {
+    switch(timeDomain) {
+      case EVENT_TIME:
+        return RunnerApi.TimeDomain.EVENT_TIME;
+      case PROCESSING_TIME:
+        return RunnerApi.TimeDomain.PROCESSING_TIME;
+      case SYNCHRONIZED_PROCESSING_TIME:
+        return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
+      default:
+        throw new IllegalArgumentException("Unknown time domain");
+    }
   }
 
   @AutoValue

http://git-wip-us.apache.org/repos/asf/beam/blob/39220dbc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index ec27957..46f6a80 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
@@ -62,98 +63,159 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Suite;
 
 /** Tests for {@link ParDoTranslation}. */
-@RunWith(Parameterized.class)
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+  ParDoTranslationTest.TestParDoPayloadTranslation.class,
+  ParDoTranslationTest.TestStateAndTimerTranslation.class
+})
 public class ParDoTranslationTest {
-  public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
-
-  private static PCollectionView<Long> singletonSideInput =
-      p.apply("GenerateSingleton", GenerateSequence.from(0L).to(1L))
-          .apply(View.<Long>asSingleton());
-  private static PCollectionView<Map<Long, Iterable<String>>> multimapSideInput =
-      p.apply("CreateMultimap", Create.of(KV.of(1L, "foo"), KV.of(1L, "bar"), KV.of(2L, "spam")))
-          .setCoder(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of()))
-          .apply(View.<Long, String>asMultimap());
-
-  private static PCollection<KV<Long, String>> mainInput =
-      p.apply("CreateMainInput", Create.empty(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of())));
-
-  @Parameters(name = "{index}: {0}")
-  public static Iterable<ParDo.MultiOutput<?, ?>> data() {
-    return ImmutableList.<ParDo.MultiOutput<?, ?>>of(
-        ParDo.of(new DropElementsFn()).withOutputTags(new TupleTag<Void>(), TupleTagList.empty()),
-        ParDo.of(new DropElementsFn())
-            .withOutputTags(new TupleTag<Void>(), TupleTagList.empty())
-            .withSideInputs(singletonSideInput, multimapSideInput),
-        ParDo.of(new DropElementsFn())
-            .withOutputTags(
-                new TupleTag<Void>(),
-                TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {}))
-            .withSideInputs(singletonSideInput, multimapSideInput),
-        ParDo.of(new DropElementsFn())
-            .withOutputTags(
-                new TupleTag<Void>(),
-                TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})));
-  }
 
-  @Parameter(0)
-  public ParDo.MultiOutput<KV<Long, String>, Void> parDo;
+  /**
+   * Tests for translating various {@link ParDo} transforms to/from {@link ParDoPayload} protos.
+   */
+  @RunWith(Parameterized.class)
+  public static class TestParDoPayloadTranslation {
+    public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+    private static PCollectionView<Long> singletonSideInput =
+        p.apply("GenerateSingleton", GenerateSequence.from(0L).to(1L))
+            .apply(View.<Long>asSingleton());
+    private static PCollectionView<Map<Long, Iterable<String>>> multimapSideInput =
+        p.apply("CreateMultimap", Create.of(KV.of(1L, "foo"), KV.of(1L, "bar"), KV.of(2L, "spam")))
+            .setCoder(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of()))
+            .apply(View.<Long, String>asMultimap());
 
-  @Test
-  public void testToAndFromProto() throws Exception {
-    SdkComponents components = SdkComponents.create();
-    ParDoPayload payload = ParDoTranslation.toProto(parDo, components);
+    private static PCollection<KV<Long, String>> mainInput =
+        p.apply(
+            "CreateMainInput", Create.empty(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of())));
 
-    assertThat(ParDoTranslation.getDoFn(payload), Matchers.<DoFn<?, ?>>equalTo(parDo.getFn()));
-    assertThat(
-        ParDoTranslation.getMainOutputTag(payload),
-        Matchers.<TupleTag<?>>equalTo(parDo.getMainOutputTag()));
-    for (PCollectionView<?> view : parDo.getSideInputs()) {
-      payload.getSideInputsOrThrow(view.getTagInternal().getId());
+    @Parameters(name = "{index}: {0}")
+    public static Iterable<ParDo.MultiOutput<?, ?>> data() {
+      return ImmutableList.<ParDo.MultiOutput<?, ?>>of(
+          ParDo.of(new DropElementsFn()).withOutputTags(new TupleTag<Void>(), TupleTagList.empty()),
+          ParDo.of(new DropElementsFn())
+              .withOutputTags(new TupleTag<Void>(), TupleTagList.empty())
+              .withSideInputs(singletonSideInput, multimapSideInput),
+          ParDo.of(new DropElementsFn())
+              .withOutputTags(
+                  new TupleTag<Void>(),
+                  TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {}))
+              .withSideInputs(singletonSideInput, multimapSideInput),
+          ParDo.of(new DropElementsFn())
+              .withOutputTags(
+                  new TupleTag<Void>(),
+                  TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})));
     }
-  }
 
-  @Test
-  public void toAndFromTransformProto() throws Exception {
-    Map<TupleTag<?>, PValue> inputs = new HashMap<>();
-    inputs.put(new TupleTag<KV<Long, String>>() {}, mainInput);
-    inputs.putAll(parDo.getAdditionalInputs());
-    PCollectionTuple output = mainInput.apply(parDo);
-
-    SdkComponents components = SdkComponents.create();
-    String transformId =
-        components.registerPTransform(
-            AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of(
-                "foo", inputs, output.expand(), parDo, p),
-            Collections.<AppliedPTransform<?, ?, ?>>emptyList());
-
-    Components protoComponents = components.toComponents();
-    RunnerApi.PTransform protoTransform =
-        protoComponents.getTransformsOrThrow(transformId);
-    ParDoPayload parDoPayload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
-    for (PCollectionView<?> view : parDo.getSideInputs()) {
-      SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
-      PCollectionView<?> restoredView =
-          ParDoTranslation.fromProto(
-              sideInput, view.getTagInternal().getId(), protoTransform, protoComponents);
-      assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
-      assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
+    @Parameter(0)
+    public ParDo.MultiOutput<KV<Long, String>, Void> parDo;
+
+    @Test
+    public void testToAndFromProto() throws Exception {
+      SdkComponents components = SdkComponents.create();
+      ParDoPayload payload = ParDoTranslation.toProto(parDo, components);
+
+      assertThat(ParDoTranslation.getDoFn(payload), Matchers.<DoFn<?, ?>>equalTo(parDo.getFn()));
       assertThat(
-          restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass()));
+          ParDoTranslation.getMainOutputTag(payload),
+          Matchers.<TupleTag<?>>equalTo(parDo.getMainOutputTag()));
+      for (PCollectionView<?> view : parDo.getSideInputs()) {
+        payload.getSideInputsOrThrow(view.getTagInternal().getId());
+      }
+    }
+
+    @Test
+    public void toAndFromTransformProto() throws Exception {
+      Map<TupleTag<?>, PValue> inputs = new HashMap<>();
+      inputs.put(new TupleTag<KV<Long, String>>() {}, mainInput);
+      inputs.putAll(parDo.getAdditionalInputs());
+      PCollectionTuple output = mainInput.apply(parDo);
+
+      SdkComponents components = SdkComponents.create();
+      String transformId =
+          components.registerPTransform(
+              AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of(
+                  "foo", inputs, output.expand(), parDo, p),
+              Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+
+      Components protoComponents = components.toComponents();
+      RunnerApi.PTransform protoTransform = protoComponents.getTransformsOrThrow(transformId);
+      ParDoPayload parDoPayload =
+          protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+      for (PCollectionView<?> view : parDo.getSideInputs()) {
+        SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
+        PCollectionView<?> restoredView =
+            ParDoTranslation.fromProto(
+                sideInput, view.getTagInternal().getId(), protoTransform, protoComponents);
+        assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
+        assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
+        assertThat(
+            restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass()));
+        assertThat(
+            restoredView.getWindowingStrategyInternal(),
+            Matchers.<WindowingStrategy<?, ?>>equalTo(
+                view.getWindowingStrategyInternal().fixDefaults()));
+        assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
+      }
+      String mainInputId = components.registerPCollection(mainInput);
       assertThat(
-          restoredView.getWindowingStrategyInternal(),
-          Matchers.<WindowingStrategy<?, ?>>equalTo(
-              view.getWindowingStrategyInternal().fixDefaults()));
-      assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
+          ParDoTranslation.getMainInput(protoTransform, protoComponents),
+          equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));
+    }
+  }
+
+  /**
+   * Tests for translating state and timer bits to/from protos.
+   */
+  @RunWith(JUnit4.class)
+  public static class TestStateAndTimerTranslation {
+
+    @Test
+    public void testValueStateSpecToFromProto() throws Exception {
+      SdkComponents sdkComponents = SdkComponents.create();
+      StateSpec<?> stateSpec = StateSpecs.value(VarIntCoder.of());
+      StateSpec<?> deserializedStateSpec =
+          ParDoTranslation.fromProto(
+              ParDoTranslation.toProto(stateSpec, sdkComponents), sdkComponents.toComponents());
+      assertThat(stateSpec, Matchers.<StateSpec<?>>equalTo(deserializedStateSpec));
+    }
+
+    @Test
+    public void testBagStateSpecToFromProto() throws Exception {
+      SdkComponents sdkComponents = SdkComponents.create();
+      StateSpec<?> stateSpec = StateSpecs.bag(VarIntCoder.of());
+      StateSpec<?> deserializedStateSpec =
+          ParDoTranslation.fromProto(
+              ParDoTranslation.toProto(stateSpec, sdkComponents), sdkComponents.toComponents());
+      assertThat(stateSpec, Matchers.<StateSpec<?>>equalTo(deserializedStateSpec));
+    }
+
+    @Test
+    public void testSetStateSpecToFromProto() throws Exception {
+      SdkComponents sdkComponents = SdkComponents.create();
+      StateSpec<?> stateSpec = StateSpecs.set(VarIntCoder.of());
+      StateSpec<?> deserializedStateSpec =
+          ParDoTranslation.fromProto(
+              ParDoTranslation.toProto(stateSpec, sdkComponents), sdkComponents.toComponents());
+      assertThat(stateSpec, Matchers.<StateSpec<?>>equalTo(deserializedStateSpec));
+    }
+
+    @Test
+    public void testMapStateSpecToFromProto() throws Exception {
+      SdkComponents sdkComponents = SdkComponents.create();
+      StateSpec<?> stateSpec = StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
+      StateSpec<?> deserializedStateSpec =
+          ParDoTranslation.fromProto(
+              ParDoTranslation.toProto(stateSpec, sdkComponents), sdkComponents.toComponents());
+      assertThat(stateSpec, Matchers.<StateSpec<?>>equalTo(deserializedStateSpec));
     }
-    String mainInputId = components.registerPCollection(mainInput);
-    assertThat(
-        ParDoTranslation.getMainInput(protoTransform, protoComponents),
-        equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));
   }
 
   private static class DropElementsFn extends DoFn<KV<Long, String>, Void> {


[3/7] beam git commit: Mark CombineFnWithContext StateSpecs internal

Posted by ke...@apache.org.
Mark CombineFnWithContext StateSpecs internal


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c8478fe1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c8478fe1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c8478fe1

Branch: refs/heads/master
Commit: c8478fe1fe107b842d3cfa56b652d740fdf0c18b
Parents: 6bb204f
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 07:25:08 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri May 26 10:22:37 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/state/StateSpecs.java | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c8478fe1/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
index 7b71384..5a2a1b6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
@@ -76,7 +76,9 @@ public class StateSpecs {
   }
 
   /**
-   * Create a {@link StateSpec} for a {@link CombiningState} which uses a {@link
+   * <b>For internal use only; no backwards compatibility guarantees</b>
+   *
+   * <p>Create a {@link StateSpec} for a {@link CombiningState} which uses a {@link
    * CombineFnWithContext} to automatically merge multiple values of type {@code InputT} into a
    * single resulting {@code OutputT}.
    *
@@ -84,6 +86,7 @@ public class StateSpecs {
    *
    * @see #combining(Coder, CombineFnWithContext)
    */
+  @Internal
   public static <InputT, AccumT, OutputT>
       StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
           CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
@@ -105,11 +108,14 @@ public class StateSpecs {
   }
 
   /**
-   * Identical to {@link #combining(CombineFnWithContext)}, but with an accumulator coder explicitly
-   * supplied.
+   * <b>For internal use only; no backwards compatibility guarantees</b>
+   *
+   * <p>Identical to {@link #combining(CombineFnWithContext)}, but with an accumulator coder
+   * explicitly supplied.
    *
    * <p>If automatic coder inference fails, use this method.
    */
+  @Internal
   public static <InputT, AccumT, OutputT>
       StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
           Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {


[2/7] beam git commit: Allow translation to throw IOException

Posted by ke...@apache.org.
Allow translation to throw IOException


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0dc523c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0dc523c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0dc523c

Branch: refs/heads/master
Commit: b0dc523c72a68e870392fbac8ff9f3a87459ab22
Parents: c8478fe
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 13:02:15 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri May 26 10:22:37 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/construction/PTransformTranslation.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b0dc523c/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 9f5f3b5..00ea55e 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -138,7 +138,8 @@ public class PTransformTranslation {
    */
   public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
     String getUrn(T transform);
-    FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components);
+    FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components)
+        throws IOException;
   }
 
   /**


[4/7] beam git commit: Add case dispatch to StateSpec

Posted by ke...@apache.org.
Add case dispatch to StateSpec

This is different than a StateBinder: for a binder, the id is needed and
the StateSpec controls the return type. For case dispatch, the
dispatcher controls the type and it should just be reading the spec,
which does not require the id. Eventually, StateBinder could be removed
in favor of StateSpec.Cases<Function<String, StateT>>.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8fc2eb0a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8fc2eb0a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8fc2eb0a

Branch: refs/heads/master
Commit: 8fc2eb0aeee9c3bdeaf93897e5e8aa4bb98b98de
Parents: 9497e5e
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 07:27:52 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri May 26 10:22:37 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/state/StateSpec.java    | 53 ++++++++++++++++++++
 .../org/apache/beam/sdk/state/StateSpecs.java   | 41 +++++++++++++++
 2 files changed, 94 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8fc2eb0a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
index b0412bf..0443f25 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine;
 
 /**
  * A specification of a persistent state cell. This includes information necessary to encode the
@@ -43,6 +44,14 @@ public interface StateSpec<StateT extends State> extends Serializable {
   /**
    * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
    *
+   * <p>Perform case analysis on this {@link StateSpec} using the provided {@link Cases}.
+   */
+  @Internal
+  <ResultT> ResultT match(Cases<ResultT> cases);
+
+  /**
+   * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
+   *
    * <p>Given {code coders} are inferred from type arguments defined for this class. Coders which
    * are already set should take precedence over offered coders.
    *
@@ -60,4 +69,48 @@ public interface StateSpec<StateT extends State> extends Serializable {
    */
   @Internal
   void finishSpecifying();
+
+  /**
+   * Cases for doing a "switch" on the type of {@link StateSpec}.
+   */
+  interface Cases<ResultT> {
+    ResultT dispatchValue(Coder<?> valueCoder);
+    ResultT dispatchBag(Coder<?> elementCoder);
+    ResultT dispatchCombining(Combine.CombineFn<?, ?, ?> combineFn, Coder<?> accumCoder);
+    ResultT dispatchMap(Coder<?> keyCoder, Coder<?> valueCoder);
+    ResultT dispatchSet(Coder<?> elementCoder);
+
+    /**
+     * A base class for a visitor with a default method for cases it is not interested in.
+     */
+    abstract class WithDefault<ResultT> implements Cases<ResultT> {
+
+      protected abstract ResultT dispatchDefault();
+
+      @Override
+      public ResultT dispatchValue(Coder<?> valueCoder) {
+        return dispatchDefault();
+      }
+
+      @Override
+      public ResultT dispatchBag(Coder<?> elementCoder) {
+        return dispatchDefault();
+      }
+
+      @Override
+      public ResultT dispatchCombining(Combine.CombineFn<?, ?, ?> combineFn, Coder<?> accumCoder) {
+        return dispatchDefault();
+      }
+
+      @Override
+      public ResultT dispatchMap(Coder<?> keyCoder, Coder<?> valueCoder) {
+        return dispatchDefault();
+      }
+
+      @Override
+      public ResultT dispatchSet(Coder<?> elementCoder) {
+        return dispatchDefault();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fc2eb0a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
index 5a2a1b6..4222304 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
@@ -278,6 +278,11 @@ public class StateSpecs {
       return visitor.bindValue(id, this, coder);
     }
 
+    @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      return cases.dispatchValue(coder);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void offerCoders(Coder[] coders) {
@@ -342,6 +347,11 @@ public class StateSpecs {
       return visitor.bindCombining(id, this, accumCoder, combineFn);
     }
 
+    @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      return cases.dispatchCombining(combineFn, accumCoder);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void offerCoders(Coder[] coders) {
@@ -413,6 +423,14 @@ public class StateSpecs {
       return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn);
     }
 
+    @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s is for internal use only and does not support case dispatch",
+              getClass().getSimpleName()));
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void offerCoders(Coder[] coders) {
@@ -480,6 +498,11 @@ public class StateSpecs {
       return visitor.bindBag(id, this, elemCoder);
     }
 
+    @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      return cases.dispatchBag(elemCoder);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void offerCoders(Coder[] coders) {
@@ -536,6 +559,11 @@ public class StateSpecs {
       return visitor.bindMap(id, this, keyCoder, valueCoder);
     }
 
+    @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      return cases.dispatchMap(keyCoder, valueCoder);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void offerCoders(Coder[] coders) {
@@ -600,6 +628,11 @@ public class StateSpecs {
       return visitor.bindSet(id, this, elemCoder);
     }
 
+    @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      return cases.dispatchSet(elemCoder);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void offerCoders(Coder[] coders) {
@@ -664,6 +697,14 @@ public class StateSpecs {
     }
 
     @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s is for internal use only and does not support case dispatch",
+              getClass().getSimpleName()));
+    }
+
+    @Override
     public void offerCoders(Coder[] coders) {
     }