You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/12 17:18:48 UTC

[1/2] incubator-beam git commit: Refactor StateSpec out of StateTag

Repository: incubator-beam
Updated Branches:
  refs/heads/master 142229e37 -> bc9ed7dbd


Refactor StateSpec out of StateTag


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c1ba2e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c1ba2e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c1ba2e1

Branch: refs/heads/master
Commit: 7c1ba2e1062556ac98b29f5bb4f5b75a7e7832e2
Parents: 135790b
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Aug 4 20:50:28 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Oct 11 20:27:12 2016 -0700

----------------------------------------------------------------------
 .../src/main/resources/beam/findbugs-filter.xml |   2 +-
 .../apache/beam/sdk/util/state/StateBinder.java |  67 +++
 .../apache/beam/sdk/util/state/StateSpec.java   |  39 ++
 .../apache/beam/sdk/util/state/StateSpecs.java  | 452 +++++++++++++++++++
 .../apache/beam/sdk/util/state/StateTag.java    |  82 ++--
 .../apache/beam/sdk/util/state/StateTags.java   | 386 ++--------------
 6 files changed, 655 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 3c01690..c9223a7 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -116,7 +116,7 @@
     <!--[BEAM-420] Non-transient non-serializable instance field in serializable class-->
   </Match>
   <Match>
-    <Class name="org.apache.beam.sdk.util.state.StateTags$CombiningValueStateTag"/>
+    <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningValueStateSpec"/>
     <Method name="equals"/>
     <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
     <!--[BEAM-421] Class doesn't override equals in superclass-->

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
new file mode 100644
index 0000000..0521e15
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+
+/**
+ * Visitor for binding a {@link StateSpec} and to the associated {@link State}.
+ *
+ * @param <K> the type of key this binder embodies.
+ */
+public interface StateBinder<K> {
+  <T> ValueState<T> bindValue(String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder);
+
+  <T> BagState<T> bindBag(String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder);
+
+  <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+      String id,
+      StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+      Coder<AccumT> accumCoder,
+      Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
+
+  <InputT, AccumT, OutputT>
+      AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+          String id,
+          StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+          Coder<AccumT> accumCoder,
+          Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
+
+  <InputT, AccumT, OutputT>
+      AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+          String id,
+          StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+          Coder<AccumT> accumCoder,
+          CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
+              combineFn);
+
+  /**
+   * Bind to a watermark {@link StateSpec}.
+   *
+   * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to
+   * the returned {@link WatermarkHoldState} are to be combined.
+   */
+  <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+      String id,
+      StateSpec<? super K, WatermarkHoldState<W>> spec,
+      OutputTimeFn<? super W> outputTimeFn);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
new file mode 100644
index 0000000..4fdeefb
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A specification of a persistent state cell. This includes information necessary to encode the
+ * value and details about the intended access pattern.
+ *
+ * @param <K> The type of key that must be used with the state tag. Contravariant: methods should
+ *            accept values of type {@code StateSpec<? super K, StateT>}.
+ * @param <StateT> The type of state being described.
+ */
+@Experimental(Kind.STATE)
+public interface StateSpec<K, StateT extends State> extends Serializable {
+
+  /**
+   * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address.
+   */
+  StateT bind(String id, StateBinder<? extends K> binder);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
new file mode 100644
index 0000000..db0eec6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+
+/**
+ * Static utility methods for creating {@link StateSpec} instances.
+ */
+@Experimental(Kind.STATE)
+public class StateSpecs {
+
+  private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry();
+
+  static {
+    STANDARD_REGISTRY.registerStandardCoders();
+  }
+
+  private StateSpecs() {}
+
+  /** Create a simple state spec for values of type {@code T}. */
+  public static <T> StateSpec<Object, ValueState<T>> value(Coder<T> valueCoder) {
+    return new ValueStateSpec<>(valueCoder);
+  }
+
+  /**
+   * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
+   * {@code InputT}s into a single {@code OutputT}.
+   */
+  public static <InputT, AccumT, OutputT>
+      StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue(
+          Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+    return combiningValueInternal(accumCoder, combineFn);
+  }
+
+  /**
+   * Create a state spec for values that use a {@link KeyedCombineFn} to automatically merge
+   * multiple {@code InputT}s into a single {@code OutputT}. The key provided to the {@link
+   * KeyedCombineFn} comes from the keyed {@link StateAccessor}.
+   */
+  public static <K, InputT, AccumT, OutputT>
+      StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+          Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
+    return keyedCombiningValueInternal(accumCoder, combineFn);
+  }
+
+  /**
+   * Create a state spec for values that use a {@link KeyedCombineFnWithContext} to automatically
+   * merge multiple {@code InputT}s into a single {@code OutputT}. The key provided to the {@link
+   * KeyedCombineFn} comes from the keyed {@link StateAccessor}, the context provided comes from the
+   * {@link StateContext}.
+   */
+  public static <K, InputT, AccumT, OutputT>
+      StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+          keyedCombiningValueWithContext(
+              Coder<AccumT> accumCoder,
+              KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
+    return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(
+        accumCoder, combineFn);
+  }
+
+  /**
+   * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
+   * {@code InputT}s into a single {@code OutputT}.
+   *
+   * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and should
+   * only be used to initialize static values.
+   */
+  public static <InputT, AccumT, OutputT>
+      StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+          combiningValueFromInputInternal(
+              Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+    try {
+      Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
+      return combiningValueInternal(accumCoder, combineFn);
+    } catch (CannotProvideCoderException e) {
+      throw new IllegalArgumentException(
+          "Unable to determine accumulator coder for "
+              + combineFn.getClass().getSimpleName()
+              + " from "
+              + inputCoder,
+          e);
+    }
+  }
+
+  private static <InputT, AccumT, OutputT>
+      StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
+          Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+    return new CombiningValueStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
+  }
+
+  private static <K, InputT, AccumT, OutputT>
+      StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
+          Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
+    return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
+  }
+
+  /**
+   * Create a state spec that is optimized for adding values frequently, and occasionally retrieving
+   * all the values that have been added.
+   */
+  public static <T> StateSpec<Object, BagState<T>> bag(Coder<T> elemCoder) {
+    return new BagStateSpec<T>(elemCoder);
+  }
+
+  /** Create a state spec for holding the watermark. */
+  public static <W extends BoundedWindow>
+      StateSpec<Object, WatermarkHoldState<W>> watermarkStateInternal(
+          OutputTimeFn<? super W> outputTimeFn) {
+    return new WatermarkStateSpecInternal<W>(outputTimeFn);
+  }
+
+  public static <K, InputT, AccumT, OutputT>
+      StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
+          StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningSpec) {
+    if (combiningSpec instanceof KeyedCombiningValueStateSpec) {
+      // Checked above; conversion to a bag spec depends on the provided spec being one of those
+      // created via the factory methods in this class.
+      @SuppressWarnings("unchecked")
+      KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT> typedSpec =
+          (KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+      return typedSpec.asBagSpec();
+    } else if (combiningSpec instanceof KeyedCombiningValueWithContextStateSpec) {
+      @SuppressWarnings("unchecked")
+      KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec =
+          (KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+      return typedSpec.asBagSpec();
+    } else {
+      throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec);
+    }
+  }
+
+  /**
+   * A specification for a state cell holding a settable value of type {@code T}.
+   *
+   * <p>Includes the coder for {@code T}.
+   */
+  private static class ValueStateSpec<T> implements StateSpec<Object, ValueState<T>> {
+
+    private final Coder<T> coder;
+
+    private ValueStateSpec(Coder<T> coder) {
+      this.coder = coder;
+    }
+
+    @Override
+    public ValueState<T> bind(String id, StateBinder<?> visitor) {
+      return visitor.bindValue(id, this, coder);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof ValueStateSpec)) {
+        return false;
+      }
+
+      ValueStateSpec<?> that = (ValueStateSpec<?>) obj;
+      return Objects.equals(this.coder, that.coder);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass(), coder);
+    }
+  }
+
+  /**
+   * A specification for a state cell that is combined according to a {@link CombineFn}.
+   *
+   * <p>Includes the {@link CombineFn} and the coder for the accumulator type.
+   */
+  private static class CombiningValueStateSpec<InputT, AccumT, OutputT>
+      extends KeyedCombiningValueStateSpec<Object, InputT, AccumT, OutputT>
+      implements StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+
+    private final Coder<AccumT> accumCoder;
+    private final CombineFn<InputT, AccumT, OutputT> combineFn;
+
+    private CombiningValueStateSpec(
+        Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+      super(accumCoder, combineFn.asKeyedFn());
+      this.combineFn = combineFn;
+      this.accumCoder = accumCoder;
+    }
+  }
+
+  /**
+   * A specification for a state cell that is combined according to a
+   * {@link KeyedCombineFnWithContext}.
+   *
+   * <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type.
+   */
+  private static class KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>
+      implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+
+    private final Coder<AccumT> accumCoder;
+    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
+
+    protected KeyedCombiningValueWithContextStateSpec(
+        Coder<AccumT> accumCoder, KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
+      this.combineFn = combineFn;
+      this.accumCoder = accumCoder;
+    }
+
+    @Override
+    public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
+        String id, StateBinder<? extends K> visitor) {
+      return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof KeyedCombiningValueWithContextStateSpec)) {
+        return false;
+      }
+
+      KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?> that =
+          (KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?>) obj;
+      return Objects.equals(this.accumCoder, that.accumCoder);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass(), accumCoder);
+    }
+
+    private StateSpec<Object, BagState<AccumT>> asBagSpec() {
+      return new BagStateSpec<AccumT>(accumCoder);
+    }
+  }
+
+  /**
+   * A specification for a state cell that is combined according to a {@link KeyedCombineFn}.
+   *
+   * <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type.
+   */
+  private static class KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>
+      implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+
+    private final Coder<AccumT> accumCoder;
+    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
+
+    protected KeyedCombiningValueStateSpec(
+        Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
+      this.keyedCombineFn = keyedCombineFn;
+      this.accumCoder = accumCoder;
+    }
+
+    @Override
+    public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
+        String id, StateBinder<? extends K> visitor) {
+      return visitor.bindKeyedCombiningValue(id, this, accumCoder, keyedCombineFn);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof CombiningValueStateSpec)) {
+        return false;
+      }
+
+      KeyedCombiningValueStateSpec<?, ?, ?, ?> that =
+          (KeyedCombiningValueStateSpec<?, ?, ?, ?>) obj;
+      return Objects.equals(this.accumCoder, that.accumCoder);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass(), accumCoder);
+    }
+
+    private StateSpec<Object, BagState<AccumT>> asBagSpec() {
+      return new BagStateSpec<AccumT>(accumCoder);
+    }
+  }
+
+  /**
+   * A specification for a state cell supporting for bag-like access patterns
+   * (frequent additions, occasional reads of all the values).
+   *
+   * <p>Includes the coder for the element type {@code T}</p>
+   */
+  private static class BagStateSpec<T> implements StateSpec<Object, BagState<T>> {
+
+    private final Coder<T> elemCoder;
+
+    private BagStateSpec(Coder<T> elemCoder) {
+      this.elemCoder = elemCoder;
+    }
+
+    @Override
+    public BagState<T> bind(String id, StateBinder<?> visitor) {
+      return visitor.bindBag(id, this, elemCoder);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof BagStateSpec)) {
+        return false;
+      }
+
+      BagStateSpec<?> that = (BagStateSpec<?>) obj;
+      return Objects.equals(this.elemCoder, that.elemCoder);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass(), elemCoder);
+    }
+  }
+
+  /**
+   * A specification for a state cell tracking a combined watermark hold.
+   *
+   * <p>Includes the {@link OutputTimeFn} according to which the output times
+   * are combined.
+   */
+  private static class WatermarkStateSpecInternal<W extends BoundedWindow>
+      implements StateSpec<Object, WatermarkHoldState<W>> {
+
+    /**
+     * When multiple output times are added to hold the watermark, this determines how they are
+     * combined, and also the behavior when merging windows. Does not contribute to equality/hash
+     * since we have at most one watermark hold spec per computation.
+     */
+    private final OutputTimeFn<? super W> outputTimeFn;
+
+    private WatermarkStateSpecInternal(OutputTimeFn<? super W> outputTimeFn) {
+      this.outputTimeFn = outputTimeFn;
+    }
+
+    @Override
+    public WatermarkHoldState<W> bind(String id, StateBinder<?> visitor) {
+      return visitor.bindWatermark(id, this, outputTimeFn);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      // All instance of WatermarkHoldState are considered equal
+      return obj instanceof WatermarkStateSpecInternal;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass());
+    }
+  }
+
+  /**
+   * @deprecated for migration purposes only
+   */
+  @Deprecated
+  public static <K> StateBinder<K> adaptTagBinder(final StateTag.StateBinder<K> binder) {
+    return new StateBinder<K>() {
+      @Override
+      public <T> ValueState<T> bindValue(
+          String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) {
+        return binder.bindValue(StateTags.tagForSpec(id, spec), coder);
+      }
+
+      @Override
+      public <T> BagState<T> bindBag(
+          String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder) {
+        return binder.bindBag(StateTags.tagForSpec(id, spec), elemCoder);
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+              String id,
+              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+              Coder<AccumT> accumCoder,
+              CombineFn<InputT, AccumT, OutputT> combineFn) {
+        return binder.bindCombiningValue(StateTags.tagForSpec(id, spec), accumCoder, combineFn);
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+              String id,
+              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+              Coder<AccumT> accumCoder,
+              KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+        return binder.bindKeyedCombiningValue(
+            StateTags.tagForSpec(id, spec), accumCoder, combineFn);
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+              String id,
+              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+              Coder<AccumT> accumCoder,
+              KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
+        return binder.bindKeyedCombiningValueWithContext(
+            StateTags.tagForSpec(id, spec), accumCoder, combineFn);
+      }
+
+      @Override
+      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+          String id,
+          StateSpec<? super K, WatermarkHoldState<W>> spec,
+          OutputTimeFn<? super W> outputTimeFn) {
+        return binder.bindWatermark(StateTags.tagForSpec(id, spec), outputTimeFn);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java
index 94cba2f..feca927 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java
@@ -30,8 +30,9 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 
 /**
- * An address for persistent state. This includes a unique identifier for the location, the
- * information necessary to encode the value, and details about the intended access pattern.
+ * An address and specification for a persistent state cell. This includes a unique identifier for
+ * the location, the information necessary to encode the value, and details about the intended
+ * access pattern.
  *
  * <p>State can be thought of as a sparse table, with each {@code StateTag} defining a column
  * that has cells of type {@code StateT}.
@@ -45,53 +46,66 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 @Experimental(Kind.STATE)
 public interface StateTag<K, StateT extends State> extends Serializable {
 
+  /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */
+  void appendTo(Appendable sb) throws IOException;
+
+  /**
+   * An identifier for the state cell that this tag references.
+   */
+  String getId();
+
+  /**
+   * The specification for the state stored in the referenced cell.
+   */
+  StateSpec<K, StateT> getSpec();
+
+  /**
+   * Bind this state tag. See {@link StateSpec#bind}.
+   *
+   * @deprecated Use the {@link StateSpec#bind} method via {@link #getSpec} for now.
+   */
+  @Deprecated
+  StateT bind(StateBinder<? extends K> binder);
+
   /**
-   * Visitor for binding a {@link StateTag} and to the associated {@link State}.
+   * Visitor for binding a {@link StateSpec} and to the associated {@link State}.
    *
    * @param <K> the type of key this binder embodies.
+   * @deprecated for migration only; runners should reference the top level {@link StateBinder}
+   * and move towards {@link StateSpec} rather than {@link StateTag}.
    */
+  @Deprecated
   public interface StateBinder<K> {
-    <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder);
+    <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> spec, Coder<T> coder);
 
-    <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder);
+    <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> spec, Coder<T> elemCoder);
 
-    <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-    bindCombiningValue(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-        Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn);
+    <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+        Coder<AccumT> accumCoder,
+        CombineFn<InputT, AccumT, OutputT> combineFn);
 
-    <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-    bindKeyedCombiningValue(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-        Coder<AccumT> accumCoder, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
+    <InputT, AccumT, OutputT>
+    AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+        Coder<AccumT> accumCoder,
+        KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
 
-    <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-    bindKeyedCombiningValueWithContext(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+    <InputT, AccumT, OutputT>
+    AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
         Coder<AccumT> accumCoder,
-        KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn);
+        KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
+            combineFn);
 
     /**
-     * Bind to a watermark {@link StateTag}.
+     * Bind to a watermark {@link StateSpec}.
      *
-     * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps
-     * added to the returned {@link WatermarkHoldState} are to be combined.
+     * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to
+     * the returned {@link WatermarkHoldState} are to be combined.
      */
     <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-        StateTag<? super K, WatermarkHoldState<W>> address,
+        StateTag<? super K, WatermarkHoldState<W>> spec,
         OutputTimeFn<? super W> outputTimeFn);
   }
-
-  /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */
-  void appendTo(Appendable sb) throws IOException;
-
-  /**
-   * Returns the user-provided name of this state cell.
-   */
-  String getId();
-
-  /**
-   * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address.
-   */
-  StateT bind(StateBinder<? extends K> binder);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
index b0797b6..3c12848 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
@@ -23,7 +23,6 @@ import java.io.Serializable;
 import java.util.Objects;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
@@ -61,11 +60,17 @@ public class StateTags {
     StateTag<K, StateT> asKind(StateKind kind);
   }
 
+  /** Create a state tag for the given id and spec. */
+  public static <K, StateT extends State> StateTag<K, StateT> tagForSpec(
+      String id, StateSpec<K, StateT> spec) {
+    return new SimpleStateTag<>(new StructuredId(id), spec);
+  }
+
   /**
    * Create a simple state tag for values of type {@code T}.
    */
   public static <T> StateTag<Object, ValueState<T>> value(String id, Coder<T> valueCoder) {
-    return new ValueStateTag<>(new StructuredId(id), valueCoder);
+    return new SimpleStateTag<>(new StructuredId(id), StateSpecs.value(valueCoder));
   }
 
   /**
@@ -76,7 +81,8 @@ public class StateTags {
     StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
     combiningValue(
       String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-    return combiningValueInternal(id, accumCoder, combineFn);
+    return new SimpleStateTag<>(
+        new StructuredId(id), StateSpecs.combiningValue(accumCoder, combineFn));
   }
 
   /**
@@ -88,7 +94,8 @@ public class StateTags {
       OutputT> StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
       keyedCombiningValue(String id, Coder<AccumT> accumCoder,
           KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return keyedCombiningValueInternal(id, accumCoder, combineFn);
+    return new SimpleStateTag<>(
+        new StructuredId(id), StateSpecs.keyedCombiningValue(accumCoder, combineFn));
   }
 
   /**
@@ -103,10 +110,8 @@ public class StateTags {
           String id,
           Coder<AccumT> accumCoder,
           KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
-    return new KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>(
-        new StructuredId(id),
-        accumCoder,
-        combineFn);
+    return new SimpleStateTag<>(
+        new StructuredId(id), StateSpecs.keyedCombiningValueWithContext(accumCoder, combineFn));
   }
 
   /**
@@ -120,32 +125,8 @@ public class StateTags {
       StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
       combiningValueFromInputInternal(
           String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-    try {
-      Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
-      return combiningValueInternal(id, accumCoder, combineFn);
-    } catch (CannotProvideCoderException e) {
-      throw new IllegalArgumentException(
-          "Unable to determine accumulator coder for " + combineFn.getClass().getSimpleName()
-          + " from " + inputCoder, e);
-    }
-  }
-
-  private static <InputT, AccumT,
-      OutputT> StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-      combiningValueInternal(
-      String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-    return
-        new CombiningValueStateTag<InputT, AccumT, OutputT>(
-            new StructuredId(id), accumCoder, combineFn);
-  }
-
-  private static <K, InputT, AccumT, OutputT>
-      StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
-          String id,
-          Coder<AccumT> accumCoder,
-          KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return new KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>(
-        new StructuredId(id), accumCoder, combineFn);
+    return new SimpleStateTag<>(
+        new StructuredId(id), StateSpecs.combiningValueFromInputInternal(inputCoder, combineFn));
   }
 
   /**
@@ -153,7 +134,7 @@ public class StateTags {
    * occasionally retrieving all the values that have been added.
    */
   public static <T> StateTag<Object, BagState<T>> bag(String id, Coder<T> elemCoder) {
-    return new BagStateTag<T>(new StructuredId(id), elemCoder);
+    return new SimpleStateTag<>(new StructuredId(id), StateSpecs.bag(elemCoder));
   }
 
   /**
@@ -161,7 +142,8 @@ public class StateTags {
    */
   public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>>
       watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) {
-    return new WatermarkStateTagInternal<W>(new StructuredId(id), outputTimeFn);
+    return new SimpleStateTag<>(
+        new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn));
   }
 
   /**
@@ -171,7 +153,7 @@ public class StateTags {
   public static <K, StateT extends State> StateTag<K, StateT> makeSystemTagInternal(
       StateTag<K, StateT> tag) {
     if (!(tag instanceof SystemStateTag)) {
-      throw new IllegalArgumentException("Expected subclass of StateTagBase, got " + tag);
+      throw new IllegalArgumentException("Expected subclass of SimpleStateTag, got " + tag);
     }
     // Checked above
     @SuppressWarnings("unchecked")
@@ -182,21 +164,9 @@ public class StateTags {
   public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>>
       convertToBagTagInternal(
           StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningTag) {
-    if (combiningTag instanceof KeyedCombiningValueStateTag) {
-      // Checked above; conversion to a bag tag depends on the provided tag being one of those
-      // created via the factory methods in this class.
-      @SuppressWarnings("unchecked")
-      KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT> typedTag =
-          (KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>) combiningTag;
-      return typedTag.asBagTag();
-    } else if (combiningTag instanceof KeyedCombiningValueWithContextStateTag) {
-      @SuppressWarnings("unchecked")
-      KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT> typedTag =
-          (KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>) combiningTag;
-      return typedTag.asBagTag();
-    } else {
-      throw new IllegalArgumentException("Unexpected StateTag " + combiningTag);
-    }
+    return new SimpleStateTag<>(
+        new StructuredId(combiningTag.getId()),
+        StateSpecs.convertToBagSpecInternal(combiningTag.getSpec()));
   }
 
   private static class StructuredId implements Serializable {
@@ -254,15 +224,24 @@ public class StateTags {
   }
 
   /**
-   * A base class that just manages the structured ids.
+   * A basic {@link StateTag} implementation that manages the structured ids.
    */
-  private abstract static class StateTagBase<K, StateT extends State>
+  private static class SimpleStateTag<K, StateT extends State>
       implements StateTag<K, StateT>, SystemStateTag<K, StateT> {
 
-    protected final StructuredId id;
+    private final StateSpec<K, StateT> spec;
+    private final StructuredId id;
 
-    protected StateTagBase(StructuredId id) {
+    public SimpleStateTag(StructuredId id, StateSpec<K, StateT> spec) {
       this.id = id;
+      this.spec = spec;
+    }
+
+    @Override
+    @Deprecated
+    public StateT bind(StateTag.StateBinder<? extends K> binder) {
+      return spec.bind(
+          this.id.getRawId(), StateSpecs.adaptTagBinder(binder));
     }
 
     @Override
@@ -271,6 +250,11 @@ public class StateTags {
     }
 
     @Override
+    public StateSpec<K, StateT> getSpec() {
+      return spec;
+    }
+
+    @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
           .add("id", id)
@@ -283,298 +267,24 @@ public class StateTags {
     }
 
     @Override
-    public abstract StateTag<K, StateT> asKind(StateKind kind);
-  }
-
-  /**
-   * A value state cell for values of type {@code T}.
-   *
-   * @param <T> the type of value being stored
-   */
-  private static class ValueStateTag<T> extends StateTagBase<Object, ValueState<T>>
-      implements StateTag<Object, ValueState<T>> {
-
-    private final Coder<T> coder;
-
-    private ValueStateTag(StructuredId id, Coder<T> coder) {
-      super(id);
-      this.coder = coder;
-    }
-
-    @Override
-    public ValueState<T> bind(StateBinder<?> visitor) {
-      return visitor.bindValue(this, coder);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof ValueStateTag)) {
-        return false;
-      }
-
-      ValueStateTag<?> that = (ValueStateTag<?>) obj;
-      return Objects.equals(this.id, that.id)
-          && Objects.equals(this.coder, that.coder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), id, coder);
-    }
-
-    @Override
-    public StateTag<Object, ValueState<T>> asKind(StateKind kind) {
-      return new ValueStateTag<T>(id.asKind(kind), coder);
-    }
-  }
-
-  /**
-   * A state cell for values that are combined according to a {@link CombineFn}.
-   *
-   * @param <InputT> the type of input values
-   * @param <AccumT> type of mutable accumulator values
-   * @param <OutputT> type of output values
-   */
-  private static class CombiningValueStateTag<InputT, AccumT, OutputT>
-      extends KeyedCombiningValueStateTag<Object, InputT, AccumT, OutputT>
-      implements StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>,
-      SystemStateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
-
-    private final Coder<AccumT> accumCoder;
-    private final CombineFn<InputT, AccumT, OutputT> combineFn;
-
-    private CombiningValueStateTag(
-        StructuredId id,
-        Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-      super(id, accumCoder, combineFn.asKeyedFn());
-      this.combineFn = combineFn;
-      this.accumCoder = accumCoder;
-    }
-
-    @Override
-    public StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-    asKind(StateKind kind) {
-      return new CombiningValueStateTag<InputT, AccumT, OutputT>(
-          id.asKind(kind), accumCoder, combineFn);
-    }
-  }
-
-  /**
-   * A state cell for values that are combined according to a {@link KeyedCombineFnWithContext}.
-   *
-   * @param <K> the type of keys
-   * @param <InputT> the type of input values
-   * @param <AccumT> type of mutable accumulator values
-   * @param <OutputT> type of output values
-   */
-  private static class KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>
-    extends StateTagBase<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-    implements SystemStateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
-
-    private final Coder<AccumT> accumCoder;
-    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
-
-    protected KeyedCombiningValueWithContextStateTag(
-        StructuredId id,
-        Coder<AccumT> accumCoder,
-        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
-      super(id);
-      this.combineFn = combineFn;
-      this.accumCoder = accumCoder;
-    }
-
-    @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
-        StateBinder<? extends K> visitor) {
-      return visitor.bindKeyedCombiningValueWithContext(this, accumCoder, combineFn);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof KeyedCombiningValueWithContextStateTag)) {
-        return false;
-      }
-
-      KeyedCombiningValueWithContextStateTag<?, ?, ?, ?> that =
-          (KeyedCombiningValueWithContextStateTag<?, ?, ?, ?>) obj;
-      return Objects.equals(this.id, that.id)
-          && Objects.equals(this.accumCoder, that.accumCoder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), id, accumCoder);
-    }
-
-    @Override
-    public StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> asKind(
-        StateKind kind) {
-      return new KeyedCombiningValueWithContextStateTag<>(
-          id.asKind(kind), accumCoder, combineFn);
-    }
-
-    private StateTag<Object, BagState<AccumT>> asBagTag() {
-      return new BagStateTag<AccumT>(id, accumCoder);
-    }
-  }
-
-  /**
-   * A state cell for values that are combined according to a {@link KeyedCombineFn}.
-   *
-   * @param <K> the type of keys
-   * @param <InputT> the type of input values
-   * @param <AccumT> type of mutable accumulator values
-   * @param <OutputT> type of output values
-   */
-  private static class KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>
-      extends StateTagBase<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-      implements SystemStateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
-
-    private final Coder<AccumT> accumCoder;
-    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
-    protected KeyedCombiningValueStateTag(
-        StructuredId id,
-        Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
-      super(id);
-      this.keyedCombineFn = keyedCombineFn;
-      this.accumCoder = accumCoder;
-    }
-
-    @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
-        StateBinder<? extends K> visitor) {
-      return visitor.bindKeyedCombiningValue(this, accumCoder, keyedCombineFn);
+    public StateTag<K, StateT> asKind(StateKind kind) {
+      return new SimpleStateTag<>(id.asKind(kind), spec);
     }
 
     @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof CombiningValueStateTag)) {
+    public boolean equals(Object other) {
+      if (!(other instanceof SimpleStateTag)) {
         return false;
       }
 
-      KeyedCombiningValueStateTag<?, ?, ?, ?> that = (KeyedCombiningValueStateTag<?, ?, ?, ?>) obj;
-      return Objects.equals(this.id, that.id)
-          && Objects.equals(this.accumCoder, that.accumCoder);
+      SimpleStateTag<?, ?> otherTag = (SimpleStateTag<?, ?>) other;
+      return Objects.equals(this.getId(), otherTag.getId())
+          && Objects.equals(this.getSpec(), otherTag.getSpec());
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(getClass(), id, accumCoder);
-    }
-
-    @Override
-    public StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> asKind(
-        StateKind kind) {
-      return new KeyedCombiningValueStateTag<>(id.asKind(kind), accumCoder, keyedCombineFn);
-    }
-
-    private StateTag<Object, BagState<AccumT>> asBagTag() {
-      return new BagStateTag<AccumT>(id, accumCoder);
-    }
-  }
-
-  /**
-   * A state cell optimized for bag-like access patterns (frequent additions, occasional reads
-   * of all the values).
-   *
-   * @param <T> the type of value in the bag
-   */
-  private static class BagStateTag<T> extends StateTagBase<Object, BagState<T>>
-      implements StateTag<Object, BagState<T>>{
-
-    private final Coder<T> elemCoder;
-
-    private BagStateTag(StructuredId id, Coder<T> elemCoder) {
-      super(id);
-      this.elemCoder = elemCoder;
-    }
-
-    @Override
-    public BagState<T> bind(StateBinder<?> visitor) {
-      return visitor.bindBag(this, elemCoder);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof BagStateTag)) {
-        return false;
-      }
-
-      BagStateTag<?> that = (BagStateTag<?>) obj;
-      return Objects.equals(this.id, that.id)
-          && Objects.equals(this.elemCoder, that.elemCoder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), id, elemCoder);
-    }
-
-    @Override
-    public StateTag<Object, BagState<T>> asKind(StateKind kind) {
-      return new BagStateTag<>(id.asKind(kind), elemCoder);
-    }
-  }
-
-  private static class WatermarkStateTagInternal<W extends BoundedWindow>
-      extends StateTagBase<Object, WatermarkHoldState<W>> {
-
-    /**
-     * When multiple output times are added to hold the watermark, this determines how they are
-     * combined, and also the behavior when merging windows. Does not contribute to equality/hash
-     * since we have at most one watermark hold tag per computation.
-     */
-    private final OutputTimeFn<? super W> outputTimeFn;
-
-    private WatermarkStateTagInternal(StructuredId id, OutputTimeFn<? super W> outputTimeFn) {
-      super(id);
-      this.outputTimeFn = outputTimeFn;
-    }
-
-    @Override
-    public WatermarkHoldState<W> bind(StateBinder<?> visitor) {
-      return visitor.bindWatermark(this, outputTimeFn);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof WatermarkStateTagInternal)) {
-        return false;
-      }
-
-      WatermarkStateTagInternal<?> that = (WatermarkStateTagInternal<?>) obj;
-      return Objects.equals(this.id, that.id);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), id);
-    }
-
-    @Override
-    public StateTag<Object, WatermarkHoldState<W>> asKind(StateKind kind) {
-      return new WatermarkStateTagInternal<W>(id.asKind(kind), outputTimeFn);
+      return Objects.hash(getClass(), this.getId(), this.getSpec());
     }
   }
 }


[2/2] incubator-beam git commit: This closes #1044

Posted by ke...@apache.org.
This closes #1044


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc9ed7db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc9ed7db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc9ed7db

Branch: refs/heads/master
Commit: bc9ed7dbd9da9f7addc365ad511b106cfcc69b01
Parents: 142229e 7c1ba2e
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 12 10:17:28 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Oct 12 10:17:28 2016 -0700

----------------------------------------------------------------------
 .../src/main/resources/beam/findbugs-filter.xml |   2 +-
 .../apache/beam/sdk/util/state/StateBinder.java |  67 +++
 .../apache/beam/sdk/util/state/StateSpec.java   |  39 ++
 .../apache/beam/sdk/util/state/StateSpecs.java  | 452 +++++++++++++++++++
 .../apache/beam/sdk/util/state/StateTag.java    |  82 ++--
 .../apache/beam/sdk/util/state/StateTags.java   | 386 ++--------------
 6 files changed, 655 insertions(+), 373 deletions(-)
----------------------------------------------------------------------