You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/26 17:47:47 UTC

[4/7] beam git commit: Add case dispatch to StateSpec

Add case dispatch to StateSpec

This is different than a StateBinder: for a binder, the id is needed and
the StateSpec controls the return type. For case dispatch, the
dispatcher controls the type and it should just be reading the spec,
which does not require the id. Eventually, StateBinder could be removed
in favor of StateSpec.Cases<Function<String, StateT>>.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8fc2eb0a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8fc2eb0a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8fc2eb0a

Branch: refs/heads/master
Commit: 8fc2eb0aeee9c3bdeaf93897e5e8aa4bb98b98de
Parents: 9497e5e
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 07:27:52 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri May 26 10:22:37 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/state/StateSpec.java    | 53 ++++++++++++++++++++
 .../org/apache/beam/sdk/state/StateSpecs.java   | 41 +++++++++++++++
 2 files changed, 94 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8fc2eb0a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
index b0412bf..0443f25 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine;
 
 /**
  * A specification of a persistent state cell. This includes information necessary to encode the
@@ -43,6 +44,14 @@ public interface StateSpec<StateT extends State> extends Serializable {
   /**
    * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
    *
+   * <p>Perform case analysis on this {@link StateSpec} using the provided {@link Cases}.
+   */
+  @Internal
+  <ResultT> ResultT match(Cases<ResultT> cases);
+
+  /**
+   * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
+   *
    * <p>Given {code coders} are inferred from type arguments defined for this class. Coders which
    * are already set should take precedence over offered coders.
    *
@@ -60,4 +69,48 @@ public interface StateSpec<StateT extends State> extends Serializable {
    */
   @Internal
   void finishSpecifying();
+
+  /**
+   * Cases for doing a "switch" on the type of {@link StateSpec}.
+   */
+  interface Cases<ResultT> {
+    ResultT dispatchValue(Coder<?> valueCoder);
+    ResultT dispatchBag(Coder<?> elementCoder);
+    ResultT dispatchCombining(Combine.CombineFn<?, ?, ?> combineFn, Coder<?> accumCoder);
+    ResultT dispatchMap(Coder<?> keyCoder, Coder<?> valueCoder);
+    ResultT dispatchSet(Coder<?> elementCoder);
+
+    /**
+     * A base class for a visitor with a default method for cases it is not interested in.
+     */
+    abstract class WithDefault<ResultT> implements Cases<ResultT> {
+
+      protected abstract ResultT dispatchDefault();
+
+      @Override
+      public ResultT dispatchValue(Coder<?> valueCoder) {
+        return dispatchDefault();
+      }
+
+      @Override
+      public ResultT dispatchBag(Coder<?> elementCoder) {
+        return dispatchDefault();
+      }
+
+      @Override
+      public ResultT dispatchCombining(Combine.CombineFn<?, ?, ?> combineFn, Coder<?> accumCoder) {
+        return dispatchDefault();
+      }
+
+      @Override
+      public ResultT dispatchMap(Coder<?> keyCoder, Coder<?> valueCoder) {
+        return dispatchDefault();
+      }
+
+      @Override
+      public ResultT dispatchSet(Coder<?> elementCoder) {
+        return dispatchDefault();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fc2eb0a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
index 5a2a1b6..4222304 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
@@ -278,6 +278,11 @@ public class StateSpecs {
       return visitor.bindValue(id, this, coder);
     }
 
+    @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      return cases.dispatchValue(coder);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void offerCoders(Coder[] coders) {
@@ -342,6 +347,11 @@ public class StateSpecs {
       return visitor.bindCombining(id, this, accumCoder, combineFn);
     }
 
+    @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      return cases.dispatchCombining(combineFn, accumCoder);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void offerCoders(Coder[] coders) {
@@ -413,6 +423,14 @@ public class StateSpecs {
       return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn);
     }
 
+    @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s is for internal use only and does not support case dispatch",
+              getClass().getSimpleName()));
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void offerCoders(Coder[] coders) {
@@ -480,6 +498,11 @@ public class StateSpecs {
       return visitor.bindBag(id, this, elemCoder);
     }
 
+    @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      return cases.dispatchBag(elemCoder);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void offerCoders(Coder[] coders) {
@@ -536,6 +559,11 @@ public class StateSpecs {
       return visitor.bindMap(id, this, keyCoder, valueCoder);
     }
 
+    @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      return cases.dispatchMap(keyCoder, valueCoder);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void offerCoders(Coder[] coders) {
@@ -600,6 +628,11 @@ public class StateSpecs {
       return visitor.bindSet(id, this, elemCoder);
     }
 
+    @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      return cases.dispatchSet(elemCoder);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void offerCoders(Coder[] coders) {
@@ -664,6 +697,14 @@ public class StateSpecs {
     }
 
     @Override
+    public <ResultT> ResultT match(Cases<ResultT> cases) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s is for internal use only and does not support case dispatch",
+              getClass().getSimpleName()));
+    }
+
+    @Override
     public void offerCoders(Coder[] coders) {
     }