You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/02 18:56:19 UTC

[1/6] beam git commit: Make DoFnSignatures robust to StateSpec subclasses

Repository: beam
Updated Branches:
  refs/heads/master eec903fb5 -> b40b26501


Make DoFnSignatures robust to StateSpec subclasses


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/190422ca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/190422ca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/190422ca

Branch: refs/heads/master
Commit: 190422cac34732ad722d58d2fecf03571f5f08e9
Parents: 8fe59c3
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 2 11:54:07 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 2 11:55:18 2017 -0700

----------------------------------------------------------------------
 .../sdk/transforms/reflect/DoFnSignatures.java  | 25 +++++++++++++++-----
 1 file changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/190422ca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 7beeeb8..666c7f6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -1188,7 +1188,8 @@ public class DoFnSignatures {
       }
 
       Class<?> stateSpecRawType = field.getType();
-      if (!(stateSpecRawType.equals(StateSpec.class))) {
+      if (!(TypeDescriptor.of(stateSpecRawType)
+          .isSubtypeOf(TypeDescriptor.of(StateSpec.class)))) {
         errors.throwIllegalArgument(
                 "%s annotation on non-%s field [%s] that has class %s",
             DoFn.StateId.class.getSimpleName(),
@@ -1208,14 +1209,26 @@ public class DoFnSignatures {
 
       Type stateSpecType = field.getGenericType();
 
+      // A type descriptor for whatever type the @StateId-annotated class has, which
+      // must be some subtype of StateSpec
+      TypeDescriptor<? extends StateSpec<?>> stateSpecSubclassTypeDescriptor =
+          (TypeDescriptor) TypeDescriptor.of(stateSpecType);
+
+      // A type descriptor for StateSpec, with the generic type parameters filled
+      // in according to the specialization of the subclass (or just straight params)
+      TypeDescriptor<StateSpec<?>> stateSpecTypeDescriptor =
+          (TypeDescriptor)
+      stateSpecSubclassTypeDescriptor.getSupertype(StateSpec.class);
+
+      // The type of the state, which may still have free type variables from the
+      // context
+      Type unresolvedStateType =
+          ((ParameterizedType) stateSpecTypeDescriptor.getType()).getActualTypeArguments()[0];
+
       // By static typing this is already a well-formed State subclass
       TypeDescriptor<? extends State> stateType =
           (TypeDescriptor<? extends State>)
-              TypeDescriptor.of(fnClazz)
-                  .resolveType(
-                      TypeDescriptor.of(
-                              ((ParameterizedType) stateSpecType).getActualTypeArguments()[1])
-                          .getType());
+              TypeDescriptor.of(fnClazz).resolveType(unresolvedStateType);
 
       declarations.put(id, DoFnSignature.StateDeclaration.create(id, field, stateType));
     }


[5/6] beam git commit: Simplify type parameters of StateSpec and related

Posted by ke...@apache.org.
Simplify type parameters of StateSpec and related

Before this change, almost all uses of state had a type variable that
existing only to support the esoteric use of a KeyedCombineFn in
a state cell. KeyedCombineFn is now gone, so the key is no longer
required.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8fe59c35
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8fe59c35
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8fe59c35

Branch: refs/heads/master
Commit: 8fe59c3555e65c21c0a0bbaa5a0ba9eac39316dd
Parents: eec903f
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 20 20:46:37 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 2 11:55:18 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            |   4 +-
 .../translation/utils/ApexStateInternals.java   |  42 +++--
 .../apex/translation/utils/NoOpStepContext.java |   2 +-
 .../translation/utils/StateInternalsProxy.java  |   6 +-
 .../utils/ApexStateInternalsTest.java           |  12 +-
 .../construction/PTransformMatchersTest.java    |   2 +-
 .../beam/runners/core/BaseExecutionContext.java |   2 +-
 .../beam/runners/core/ExecutionContext.java     |   2 +-
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   |   2 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   2 +-
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |   2 +-
 .../runners/core/InMemoryStateInternals.java    |  36 ++--
 .../runners/core/MergingActiveWindowSet.java    |   4 +-
 .../beam/runners/core/MergingStateAccessor.java |   2 +-
 .../apache/beam/runners/core/NonEmptyPanes.java |   2 +-
 .../beam/runners/core/PaneInfoTracker.java      |   2 +-
 .../runners/core/ReduceFnContextFactory.java    |  37 ++--
 .../beam/runners/core/ReduceFnRunner.java       |   4 +-
 .../beam/runners/core/SideInputHandler.java     |  14 +-
 .../beam/runners/core/SimpleDoFnRunner.java     |   8 +-
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   2 +-
 .../beam/runners/core/SplittableParDo.java      |   8 +-
 .../apache/beam/runners/core/StateAccessor.java |   2 +-
 .../beam/runners/core/StateInternals.java       |   8 +-
 .../runners/core/StateInternalsFactory.java     |   2 +-
 .../apache/beam/runners/core/StateMerging.java  |  16 +-
 .../apache/beam/runners/core/StateTable.java    |  10 +-
 .../org/apache/beam/runners/core/StateTag.java  |  28 ++-
 .../org/apache/beam/runners/core/StateTags.java |  70 ++++----
 .../beam/runners/core/StatefulDoFnRunner.java   |   6 +-
 .../beam/runners/core/SystemReduceFn.java       |   8 +-
 .../core/TestInMemoryStateInternals.java        |   6 +-
 .../apache/beam/runners/core/WatermarkHold.java |   8 +-
 .../beam/runners/core/WindowingInternals.java   |   2 +-
 .../AfterDelayFromFirstElementStateMachine.java |   2 +-
 .../core/triggers/AfterPaneStateMachine.java    |   2 +-
 .../TriggerStateMachineContextFactory.java      |  12 +-
 .../triggers/TriggerStateMachineRunner.java     |   2 +-
 .../core/GroupAlsoByWindowsProperties.java      |  10 +-
 .../core/InMemoryStateInternalsTest.java        |  16 +-
 .../core/MergingActiveWindowSetTest.java        |   2 +-
 .../beam/runners/core/ReduceFnTester.java       |  18 +-
 .../beam/runners/core/SplittableParDoTest.java  |   2 +-
 .../apache/beam/runners/core/StateTagTest.java  |  62 +++----
 .../runners/core/StatefulDoFnRunnerTest.java    |   4 +-
 .../CopyOnAccessInMemoryStateInternals.java     | 118 ++++++-------
 .../runners/direct/DirectExecutionContext.java  |  15 +-
 .../beam/runners/direct/EvaluationContext.java  |   8 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   6 +-
 .../beam/runners/direct/ParDoEvaluator.java     |   2 +-
 ...littableProcessElementsEvaluatorFactory.java |   2 +-
 .../direct/StatefulParDoEvaluatorFactory.java   |   5 +-
 .../runners/direct/StepTransformResult.java     |   4 +-
 .../beam/runners/direct/TransformResult.java    |   2 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java | 106 +++++------
 .../beam/runners/direct/DirectRunnerTest.java   |   1 +
 .../runners/direct/EvaluationContextTest.java   |  12 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |  10 +-
 .../functions/FlinkNoOpStepContext.java         |   2 +-
 .../functions/FlinkStatefulDoFnFunction.java    |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |   6 +-
 .../streaming/SplittableDoFnOperator.java       |   4 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   4 +-
 .../state/FlinkBroadcastStateInternals.java     |  45 +++--
 .../state/FlinkKeyGroupStateInternals.java      |  29 ++-
 .../state/FlinkSplitStateInternals.java         |  29 ++-
 .../streaming/state/FlinkStateInternals.java    |  48 ++---
 .../flink/streaming/DoFnOperatorTest.java       |   4 +-
 .../FlinkBroadcastStateInternalsTest.java       |   6 +-
 .../FlinkKeyGroupStateInternalsTest.java        |   2 +-
 .../streaming/FlinkSplitStateInternalsTest.java |   2 +-
 .../streaming/FlinkStateInternalsTest.java      |  12 +-
 .../BatchStatefulParDoOverridesTest.java        |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   2 +-
 .../spark/stateful/SparkStateInternals.java     |  44 +++--
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   2 +-
 .../spark/translation/SparkProcessContext.java  |   2 +-
 .../spark/translation/TranslationUtils.java     |   2 +-
 .../org/apache/beam/sdk/transforms/DoFn.java    |   8 +-
 .../beam/sdk/transforms/GroupIntoBatches.java   |   6 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |   2 +-
 .../apache/beam/sdk/util/state/StateBinder.java |  28 +--
 .../apache/beam/sdk/util/state/StateSpec.java   |  15 +-
 .../apache/beam/sdk/util/state/StateSpecs.java  |  92 +++++-----
 .../apache/beam/sdk/transforms/ParDoTest.java   | 177 +++++++++----------
 .../transforms/reflect/DoFnInvokersTest.java    |   2 +-
 .../transforms/reflect/DoFnSignaturesTest.java  |  26 +--
 .../beam/fn/harness/fake/FakeStepContext.java   |   2 +-
 88 files changed, 692 insertions(+), 701 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index b66d818..d5dd0dd 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -94,7 +94,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
   private StateInternalsProxy<?> currentKeyStateInternals;
   private final ApexTimerInternals<Object> currentKeyTimerInternals;
 
-  private final StateInternals<Void> sideInputStateInternals;
+  private final StateInternals sideInputStateInternals;
   private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
   private LongMin pushedBackWatermark = new LongMin();
   private long currentInputWatermark = Long.MIN_VALUE;
@@ -327,7 +327,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
     NoOpStepContext stepContext = new NoOpStepContext() {
 
       @Override
-      public StateInternals<?> stateInternals() {
+      public StateInternals stateInternals() {
         return currentKeyStateInternals;
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index e682894..4300567 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -65,7 +65,7 @@ import org.joda.time.Instant;
  * <p>For fields that need to be serialized, use {@link ApexStateInternalsFactory}
  * or {@link StateInternalsProxy}
  */
-public class ApexStateInternals<K> implements StateInternals<K> {
+public class ApexStateInternals<K> implements StateInternals {
   private final K key;
   private final Table<String, String, byte[]> stateTable;
 
@@ -80,46 +80,44 @@ public class ApexStateInternals<K> implements StateInternals<K> {
   }
 
   @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+  public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
     return state(namespace, address, StateContexts.nullContext());
   }
 
   @Override
   public <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) {
-    return address.bind(new ApexStateBinder(key, namespace, address, c));
+      StateNamespace namespace, StateTag<T> address, final StateContext<?> c) {
+    return address.bind(new ApexStateBinder(namespace, address, c));
   }
 
   /**
    * A {@link StateBinder} that returns {@link State} wrappers for serialized state.
    */
-  private class ApexStateBinder implements StateBinder<K> {
-    private final K key;
+  private class ApexStateBinder implements StateBinder {
     private final StateNamespace namespace;
     private final StateContext<?> c;
 
-    private ApexStateBinder(K key, StateNamespace namespace, StateTag<? super K, ?> address,
+    private ApexStateBinder(StateNamespace namespace, StateTag<?> address,
         StateContext<?> c) {
-      this.key = key;
       this.namespace = namespace;
       this.c = c;
     }
 
     @Override
     public <T> ValueState<T> bindValue(
-        StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+        StateTag<ValueState<T>> address, Coder<T> coder) {
       return new ApexValueState<>(namespace, address, coder);
     }
 
     @Override
     public <T> BagState<T> bindBag(
-        final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+        final StateTag<BagState<T>> address, Coder<T> elemCoder) {
       return new ApexBagState<>(namespace, address, elemCoder);
     }
 
     @Override
     public <T> SetState<T> bindSet(
-        StateTag<? super K, SetState<T>> address,
+        StateTag<SetState<T>> address,
         Coder<T> elemCoder) {
       throw new UnsupportedOperationException(
           String.format("%s is not supported", SetState.class.getSimpleName()));
@@ -127,7 +125,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
 
     @Override
     public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-        StateTag<? super K, MapState<KeyT, ValueT>> spec,
+        StateTag<MapState<KeyT, ValueT>> spec,
         Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
       throw new UnsupportedOperationException(
           String.format("%s is not supported", MapState.class.getSimpleName()));
@@ -136,7 +134,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValue(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             final CombineFn<InputT, AccumT, OutputT> combineFn) {
       return new ApexCombiningState<>(
@@ -149,8 +147,8 @@ public class ApexStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-        StateTag<? super K, WatermarkHoldState> address,
+    public WatermarkHoldState bindWatermark(
+        StateTag<WatermarkHoldState> address,
         TimestampCombiner timestampCombiner) {
       return new ApexWatermarkHoldState<>(namespace, address, timestampCombiner);
     }
@@ -158,7 +156,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValueWithContext(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
       return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
@@ -167,12 +165,12 @@ public class ApexStateInternals<K> implements StateInternals<K> {
 
   private class AbstractState<T> {
     protected final StateNamespace namespace;
-    protected final StateTag<?, ? extends State> address;
+    protected final StateTag<? extends State> address;
     protected final Coder<T> coder;
 
     private AbstractState(
         StateNamespace namespace,
-        StateTag<?, ? extends State> address,
+        StateTag<? extends State> address,
         Coder<T> coder) {
       this.namespace = namespace;
       this.address = address;
@@ -233,7 +231,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
 
     private ApexValueState(
         StateNamespace namespace,
-        StateTag<?, ValueState<T>> address,
+        StateTag<ValueState<T>> address,
         Coder<T> coder) {
       super(namespace, address, coder);
     }
@@ -261,7 +259,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
 
     public ApexWatermarkHoldState(
         StateNamespace namespace,
-        StateTag<?, WatermarkHoldState> address,
+        StateTag<WatermarkHoldState> address,
         TimestampCombiner timestampCombiner) {
       super(namespace, address, InstantCoder.of());
       this.timestampCombiner = timestampCombiner;
@@ -312,7 +310,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
     private final CombineFn<InputT, AccumT, OutputT> combineFn;
 
     private ApexCombiningState(StateNamespace namespace,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Coder<AccumT> coder,
         K key, CombineFn<InputT, AccumT, OutputT> combineFn) {
       super(namespace, address, coder);
@@ -376,7 +374,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
   private final class ApexBagState<T> extends AbstractState<List<T>> implements BagState<T> {
     private ApexBagState(
         StateNamespace namespace,
-        StateTag<?, BagState<T>> address,
+        StateTag<BagState<T>> address,
         Coder<T> coder) {
       super(namespace, address, ListCoder.of(coder));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index cc64c7c..721eecd 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -60,7 +60,7 @@ public class NoOpStepContext implements ExecutionContext.StepContext, Serializab
   }
 
   @Override
-  public StateInternals<?> stateInternals() {
+  public StateInternals stateInternals() {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
index 1f28364..746be2f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.util.state.StateContext;
  * @param <K>
  */
 @DefaultSerializer(JavaSerializer.class)
-public class StateInternalsProxy<K> implements StateInternals<K>, Serializable {
+public class StateInternalsProxy<K> implements StateInternals, Serializable {
 
   private final StateInternalsFactory<K> factory;
   private transient K currentKey;
@@ -55,12 +55,12 @@ public class StateInternalsProxy<K> implements StateInternals<K>, Serializable {
   }
 
   @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+  public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
     return factory.stateInternalsForKey(currentKey).state(namespace, address);
   }
 
   @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address,
+  public <T extends State> T state(StateNamespace namespace, StateTag<T> address,
       StateContext<?> c) {
     return factory.stateInternalsForKey(currentKey).state(namespace, address, c);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 225b654..8b48a74 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -58,19 +58,19 @@ public class ApexStateInternalsTest {
   private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
   private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
 
-  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+  private static final StateTag<CombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, WatermarkHoldState>
+  private static final StateTag<WatermarkHoldState>
       WATERMARK_EARLIEST_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
 
   private ApexStateInternals<String> underTest;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 9754bb3..33ba80c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -169,7 +169,7 @@ public class PTransformMatchersTest implements Serializable {
         private final String stateId = "mystate";
 
         @StateId(stateId)
-        private final StateSpec<Object, ValueState<Integer>> intState =
+        private final StateSpec<ValueState<Integer>> intState =
             StateSpecs.value(VarIntCoder.of());
 
         @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
index cc7b574..aa1474c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
@@ -165,7 +165,7 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex
     }
 
     @Override
-    public abstract StateInternals<?> stateInternals();
+    public abstract StateInternals stateInternals();
 
     @Override
     public abstract TimerInternals timerInternals();

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
index ecd30c0..c0d01ae 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
@@ -92,7 +92,7 @@ public interface ExecutionContext {
         Coder<W> windowCoder)
             throws IOException;
 
-    StateInternals<?> stateInternals();
+    StateInternals stateInternals();
 
     TimerInternals timerInternals();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
index 9e66f07..311ed1c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
@@ -57,7 +57,7 @@ public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W extends
     InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
     timerInternals.advanceProcessingTime(Instant.now());
     timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-    StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+    StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
 
     ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
         new ReduceFnRunner<>(

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 651458f..34afa5d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -63,7 +63,7 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
 
     K key = keyedWorkItem.key();
     TimerInternals timerInternals = c.windowingInternals().timerInternals();
-    StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+    StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
 
     ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
         new ReduceFnRunner<>(

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index de4ac29..c553dbf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -114,7 +114,7 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
     KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
 
     K key = keyedWorkItem.key();
-    StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+    StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
     TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);
 
     ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 2c02282..199ce41 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -53,7 +53,7 @@ import org.joda.time.Instant;
  * and for running tests that need state.
  */
 @Experimental(Kind.STATE)
-public class InMemoryStateInternals<K> implements StateInternals<K> {
+public class InMemoryStateInternals<K> implements StateInternals {
 
   public static <K> InMemoryStateInternals<K> forKey(K key) {
     return new InMemoryStateInternals<>(key);
@@ -79,10 +79,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     T copy();
   }
 
-  protected final StateTable<K> inMemoryState = new StateTable<K>() {
+  protected final StateTable inMemoryState = new StateTable() {
     @Override
-    protected StateBinder<K> binderForNamespace(StateNamespace namespace, StateContext<?> c) {
-      return new InMemoryStateBinder<K>(key, c);
+    protected StateBinder binderForNamespace(StateNamespace namespace, StateContext<?> c) {
+      return new InMemoryStateBinder(c);
     }
   };
 
@@ -99,48 +99,46 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
   }
 
   @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+  public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
     return inMemoryState.get(namespace, address, StateContexts.nullContext());
   }
 
   @Override
   public <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) {
+      StateNamespace namespace, StateTag<T> address, final StateContext<?> c) {
     return inMemoryState.get(namespace, address, c);
   }
 
   /**
    * A {@link StateBinder} that returns In Memory {@link State} objects.
    */
-  public static class InMemoryStateBinder<K> implements StateBinder<K> {
-    private final K key;
+  public static class InMemoryStateBinder implements StateBinder {
     private final StateContext<?> c;
 
-    public InMemoryStateBinder(K key, StateContext<?> c) {
-      this.key = key;
+    public InMemoryStateBinder(StateContext<?> c) {
       this.c = c;
     }
 
     @Override
     public <T> ValueState<T> bindValue(
-        StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+        StateTag<ValueState<T>> address, Coder<T> coder) {
       return new InMemoryValue<T>();
     }
 
     @Override
     public <T> BagState<T> bindBag(
-        final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+        final StateTag<BagState<T>> address, Coder<T> elemCoder) {
       return new InMemoryBag<T>();
     }
 
     @Override
-    public <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> elemCoder) {
+    public <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder) {
       return new InMemorySet<>();
     }
 
     @Override
     public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-        StateTag<? super K, MapState<KeyT, ValueT>> spec,
+        StateTag<MapState<KeyT, ValueT>> spec,
         Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
       return new InMemoryMap<>();
     }
@@ -148,23 +146,23 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValue(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             final CombineFn<InputT, AccumT, OutputT> combineFn) {
       return new InMemoryCombiningState<>(combineFn);
     }
 
     @Override
-    public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-        StateTag<? super K, WatermarkHoldState> address,
+    public WatermarkHoldState bindWatermark(
+        StateTag<WatermarkHoldState> address,
         TimestampCombiner timestampCombiner) {
-      return new InMemoryWatermarkHold<W>(timestampCombiner);
+      return new InMemoryWatermarkHold(timestampCombiner);
     }
 
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValueWithContext(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
       return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
index b4e864c..2faedbb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
@@ -67,10 +67,10 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
    */
   private final ValueState<Map<W, Set<W>>> valueState;
 
-  public MergingActiveWindowSet(WindowFn<Object, W> windowFn, StateInternals<?> state) {
+  public MergingActiveWindowSet(WindowFn<Object, W> windowFn, StateInternals state) {
     this.windowFn = windowFn;
 
-    StateTag<Object, ValueState<Map<W, Set<W>>>> tag =
+    StateTag<ValueState<Map<W, Set<W>>>> tag =
         StateTags.makeSystemTagInternal(StateTags.value(
             "tree", MapCoder.of(windowFn.windowCoder(), SetCoder.of(windowFn.windowCoder()))));
     valueState = state.state(StateNamespaces.global(), tag);

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
index e948650..5ffb9a2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
@@ -37,5 +37,5 @@ public interface MergingStateAccessor<K, W extends BoundedWindow>
    * are known to have state.
    */
   <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-      StateTag<? super K, StateT> address);
+      StateTag<StateT> address);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
index 3e875c2..06dcc9c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -113,7 +113,7 @@ public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
   private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
       extends NonEmptyPanes<K, W> {
 
-    private static final StateTag<Object, CombiningState<Long, long[], Long>>
+    private static final StateTag<CombiningState<Long, long[], Long>>
         PANE_ADDITIONS_TAG =
         StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
             "count", VarLongCoder.of(), Sum.ofLongs()));

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
index 4cf4d67..66b3960 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -43,7 +43,7 @@ public class PaneInfoTracker {
   }
 
   @VisibleForTesting
-  static final StateTag<Object, ValueState<PaneInfo>> PANE_INFO_TAG =
+  static final StateTag<ValueState<PaneInfo>> PANE_INFO_TAG =
       StateTags.makeSystemTagInternal(StateTags.value("pane", PaneInfoCoder.INSTANCE));
 
   public void clear(StateAccessor<?> state) {

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 8493474..3031ebf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -51,7 +51,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
   private final K key;
   private final ReduceFn<K, InputT, OutputT, W> reduceFn;
   private final WindowingStrategy<?, W> windowingStrategy;
-  private final StateInternals<K> stateInternals;
+  private final StateInternals stateInternals;
   private final ActiveWindowSet<W> activeWindows;
   private final TimerInternals timerInternals;
   private final SideInputReader sideInputReader;
@@ -61,7 +61,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
       K key,
       ReduceFn<K, InputT, OutputT, W> reduceFn,
       WindowingStrategy<?, W> windowingStrategy,
-      StateInternals<K> stateInternals,
+      StateInternals stateInternals,
       ActiveWindowSet<W> activeWindows,
       TimerInternals timerInternals,
       SideInputReader sideInputReader,
@@ -165,11 +165,15 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
     protected final StateContext<W> context;
     protected final StateNamespace windowNamespace;
     protected final Coder<W> windowCoder;
-    protected final StateInternals<K> stateInternals;
+    protected final StateInternals stateInternals;
     protected final StateStyle style;
 
-    public StateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
-        StateInternals<K> stateInternals, StateContext<W> context, StateStyle style) {
+    public StateAccessorImpl(
+        ActiveWindowSet<W> activeWindows,
+        Coder<W> windowCoder,
+        StateInternals stateInternals,
+        StateContext<W> context,
+        StateStyle style) {
 
       this.activeWindows = activeWindows;
       this.windowCoder = windowCoder;
@@ -196,7 +200,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
     }
 
     @Override
-    public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
+    public <StateT extends State> StateT access(StateTag<StateT> address) {
       switch (style) {
         case DIRECT:
           return stateInternals.state(windowNamespace(), address, context);
@@ -212,8 +216,12 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
       extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
     private final Collection<W> activeToBeMerged;
 
-    public MergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
-        StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged,
+    public MergingStateAccessorImpl(
+        ActiveWindowSet<W> activeWindows,
+        Coder<W> windowCoder,
+        StateInternals stateInternals,
+        StateStyle style,
+        Collection<W> activeToBeMerged,
         W mergeResult) {
       super(activeWindows, windowCoder, stateInternals,
           stateContextForWindowOnly(mergeResult), style);
@@ -221,7 +229,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
     }
 
     @Override
-    public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
+    public <StateT extends State> StateT access(StateTag<StateT> address) {
       switch (style) {
         case DIRECT:
           return stateInternals.state(windowNamespace(), address, context);
@@ -237,7 +245,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
 
     @Override
     public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-        StateTag<? super K, StateT> address) {
+        StateTag<StateT> address) {
       ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
       for (W mergingWindow : activeToBeMerged) {
         StateNamespace namespace = null;
@@ -258,8 +266,11 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
 
   static class PremergingStateAccessorImpl<K, W extends BoundedWindow>
       extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
-    public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
-        StateInternals<K> stateInternals, W window) {
+    public PremergingStateAccessorImpl(
+        ActiveWindowSet<W> activeWindows,
+        Coder<W> windowCoder,
+        StateInternals stateInternals,
+        W window) {
       super(activeWindows, windowCoder, stateInternals,
           stateContextForWindowOnly(window), StateStyle.RENAMED);
     }
@@ -270,7 +281,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
 
     @Override
     public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-        StateTag<? super K, StateT> address) {
+        StateTag<StateT> address) {
       ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
       for (W stateAddressWindow : activeWindows.readStateAddresses(context.window())) {
         StateT stateForWindow =

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 08014ec..d3dc067 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -108,7 +108,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
 
   private final OutputWindowedValue<KV<K, OutputT>> outputter;
 
-  private final StateInternals<K> stateInternals;
+  private final StateInternals stateInternals;
 
   private final Counter droppedDueToClosedWindow;
 
@@ -214,7 +214,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       K key,
       WindowingStrategy<?, W> windowingStrategy,
       ExecutableTriggerStateMachine triggerStateMachine,
-      StateInternals<K> stateInternals,
+      StateInternals stateInternals,
       TimerInternals timerInternals,
       OutputWindowedValue<KV<K, OutputT>> outputter,
       SideInputReader sideInputReader,

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 26e920a..5c67148 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -61,7 +61,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
    * be keep locally but if side inputs are broadcast to all parallel operators then all will
    * have the same view of the state.
    */
-  private final StateInternals<Void> stateInternals;
+  private final StateInternals stateInternals;
 
   /**
    * A state tag for each side input that we handle. The state is used to track
@@ -70,7 +70,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
   private final Map<
       PCollectionView<?>,
       StateTag<
-          Object,
           CombiningState<
                         BoundedWindow,
                         Set<BoundedWindow>,
@@ -81,7 +80,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
    */
   private final Map<
       PCollectionView<?>,
-      StateTag<Object, ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags;
+      StateTag<ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags;
 
   /**
    * Creates a new {@code SideInputHandler} for the given side inputs that uses
@@ -89,7 +88,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
    */
   public SideInputHandler(
       Collection<PCollectionView<?>> sideInputs,
-      StateInternals<Void> stateInternals) {
+      StateInternals stateInternals) {
     this.sideInputs = sideInputs;
     this.stateInternals = stateInternals;
     this.availableWindowsTags = new HashMap<>();
@@ -105,7 +104,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
               .windowCoder();
 
       StateTag<
-          Object,
           CombiningState<
                         BoundedWindow,
                         Set<BoundedWindow>,
@@ -117,7 +115,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
       availableWindowsTags.put(sideInput, availableTag);
 
       Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
-      StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+      StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
           StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(), coder);
       sideInputContentsTags.put(sideInput, stateTag);
     }
@@ -145,7 +143,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
       inputWithReifiedWindows.add(value.withValue(e));
     }
 
-    StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+    StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
         sideInputContentsTags.get(sideInput);
 
     for (BoundedWindow window: value.getWindows()) {
@@ -175,7 +173,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
             .getWindowFn()
             .windowCoder();
 
-    StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+    StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
         sideInputContentsTags.get(sideInput);
 
     ValueState<Iterable<WindowedValue<?>>> state =

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 1865d54..edce9a2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -614,8 +614,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     @Override
     public State state(String stateId) {
       try {
-        StateSpec<?, ?> spec =
-            (StateSpec<?, ?>) signature.stateDeclarations().get(stateId).field().get(fn);
+        StateSpec<?> spec =
+            (StateSpec<?>) signature.stateDeclarations().get(stateId).field().get(fn);
         return stepContext
             .stateInternals()
             .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec));
@@ -726,8 +726,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     @Override
     public State state(String stateId) {
       try {
-        StateSpec<?, ?> spec =
-            (StateSpec<?, ?>) signature.stateDeclarations().get(stateId).field().get(fn);
+        StateSpec<?> spec =
+            (StateSpec<?>) signature.stateDeclarations().get(stateId).field().get(fn);
         return stepContext
             .stateInternals()
             .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec));

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index b8db491..b5f8f45 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -489,7 +489,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
         }
 
         @Override
-        public StateInternals<?> stateInternals() {
+        public StateInternals stateInternals() {
           return context.stepContext.stateInternals();
         }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 0fa6f76..94f5f35 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -353,7 +353,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
      * the input watermark when the first {@link DoFn.ProcessElement} call for this element
      * completes.
      */
-    private static final StateTag<Object, WatermarkHoldState> watermarkHoldTag =
+    private static final StateTag<WatermarkHoldState> watermarkHoldTag =
         StateTags.makeSystemTagInternal(
             StateTags.<GlobalWindow>watermarkStateInternal(
                 "hold", TimestampCombiner.LATEST));
@@ -363,13 +363,13 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
      * DoFn.ProcessElement} call and read during subsequent calls in response to timer firings, when
      * the original element is no longer available.
      */
-    private final StateTag<Object, ValueState<WindowedValue<InputT>>> elementTag;
+    private final StateTag<ValueState<WindowedValue<InputT>>> elementTag;
 
     /**
      * The state cell containing a restriction representing the unprocessed part of work for this
      * element.
      */
-    private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
+    private StateTag<ValueState<RestrictionT>> restrictionTag;
 
     private final DoFn<InputT, OutputT> fn;
     private final Coder<InputT> elementCoder;
@@ -453,7 +453,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     @ProcessElement
     public void processElement(final ProcessContext c) {
       String key = c.element().key();
-      StateInternals<String> stateInternals =
+      StateInternals stateInternals =
           stateInternalsFactory.stateInternalsForKey(key);
       TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
index 87353f2..eda896b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
@@ -34,5 +34,5 @@ public interface StateAccessor<K> {
    * <p>Never accounts for merged windows. When windows are merged, any state accessed via
    * this method must be eagerly combined and written into the result window.
    */
-  <StateT extends State> StateT access(StateTag<? super K, StateT> address);
+  <StateT extends State> StateT access(StateTag<StateT> address);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
index e6440bf..c2e9412 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
@@ -40,20 +40,20 @@ import org.apache.beam.sdk.util.state.StateContext;
  * used directly, and is highly likely to change.
  */
 @Experimental(Kind.STATE)
-public interface StateInternals<K> {
+public interface StateInternals {
 
   /** The key for this {@link StateInternals}. */
-  K getKey();
+  Object getKey();
 
   /**
    * Return the state associated with {@code address} in the specified {@code namespace}.
    */
-  <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address);
+  <T extends State> T state(StateNamespace namespace, StateTag<T> address);
 
   /**
    * Return the state associated with {@code address} in the specified {@code namespace}
    * with the {@link StateContext}.
    */
   <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c);
+      StateNamespace namespace, StateTag<T> address, StateContext<?> c);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
index ea7d742..c756364 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
@@ -31,5 +31,5 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
 public interface StateInternalsFactory<K> {
 
   /** Returns {@link StateInternals} for the provided key. */
-  StateInternals<K> stateInternalsForKey(K key);
+  StateInternals stateInternalsForKey(K key);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index ce37fd3..f6b9103 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -42,7 +42,7 @@ public class StateMerging {
    * in {@code context}.
    */
   public static <K, StateT extends State, W extends BoundedWindow> void clear(
-      MergingStateAccessor<K, W> context, StateTag<? super K, StateT> address) {
+      MergingStateAccessor<K, W> context, StateTag<StateT> address) {
     for (StateT state : context.accessInEachMergingWindow(address).values()) {
       state.clear();
     }
@@ -54,7 +54,7 @@ public class StateMerging {
    * blindly append to.
    */
   public static <K, T, W extends BoundedWindow> void prefetchBags(
-      MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) {
+      MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) {
     Map<W, BagState<T>> map = context.accessInEachMergingWindow(address);
     if (map.isEmpty()) {
       // Nothing to prefetch.
@@ -73,7 +73,7 @@ public class StateMerging {
    * Merge all bag state in {@code address} across all windows under merge.
    */
   public static <K, T, W extends BoundedWindow> void mergeBags(
-      MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) {
+      MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) {
     mergeBags(context.accessInEachMergingWindow(address).values(), context.access(address));
   }
 
@@ -116,7 +116,7 @@ public class StateMerging {
    * Merge all set state in {@code address} across all windows under merge.
    */
   public static <K, T, W extends BoundedWindow> void mergeSets(
-      MergingStateAccessor<K, W> context, StateTag<? super K, SetState<T>> address) {
+      MergingStateAccessor<K, W> context, StateTag<SetState<T>> address) {
     mergeSets(context.accessInEachMergingWindow(address).values(), context.access(address));
   }
 
@@ -161,7 +161,7 @@ public class StateMerging {
    */
   public static <K, StateT extends GroupingState<?, ?>, W extends BoundedWindow> void
       prefetchCombiningValues(MergingStateAccessor<K, W> context,
-          StateTag<? super K, StateT> address) {
+          StateTag<StateT> address) {
     for (StateT state : context.accessInEachMergingWindow(address).values()) {
       prefetchRead(state);
     }
@@ -172,7 +172,7 @@ public class StateMerging {
    */
   public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(
       MergingStateAccessor<K, W> context,
-      StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address) {
+      StateTag<CombiningState<InputT, AccumT, OutputT>> address) {
     mergeCombiningValues(
         context.accessInEachMergingWindow(address).values(), context.access(address));
   }
@@ -218,7 +218,7 @@ public class StateMerging {
    */
   public static <K, W extends BoundedWindow> void prefetchWatermarks(
       MergingStateAccessor<K, W> context,
-      StateTag<? super K, WatermarkHoldState> address) {
+      StateTag<WatermarkHoldState> address) {
     Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(address);
     WatermarkHoldState result = context.access(address);
     if (map.isEmpty()) {
@@ -250,7 +250,7 @@ public class StateMerging {
    */
   public static <K, W extends BoundedWindow> void mergeWatermarks(
       MergingStateAccessor<K, W> context,
-      StateTag<? super K, WatermarkHoldState> address,
+      StateTag<WatermarkHoldState> address,
       W mergeResult) {
     mergeWatermarks(
         context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult);

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
index d2511c9..1bf4ff5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
@@ -28,9 +28,9 @@ import org.apache.beam.sdk.util.state.StateContext;
 /**
  * Table mapping {@code StateNamespace} and {@code StateTag<?>} to a {@code State} instance.
  */
-public abstract class StateTable<K> {
+public abstract class StateTable {
 
-  private final Table<StateNamespace, StateTag<? super K, ?>, State> stateTable =
+  private final Table<StateNamespace, StateTag<?>, State> stateTable =
       HashBasedTable.create();
 
   /**
@@ -39,7 +39,7 @@ public abstract class StateTable<K> {
    * already present in this {@link StateTable}.
    */
   public <StateT extends State> StateT get(
-      StateNamespace namespace, StateTag<? super K, StateT> tag, StateContext<?> c) {
+      StateNamespace namespace, StateTag<StateT> tag, StateContext<?> c) {
     State storage = stateTable.get(namespace, tag);
     if (storage != null) {
       @SuppressWarnings("unchecked")
@@ -68,7 +68,7 @@ public abstract class StateTable<K> {
     return stateTable.containsRow(namespace);
   }
 
-  public Map<StateTag<? super K, ?>, State> getTagsInUse(StateNamespace namespace) {
+  public Map<StateTag<?>, State> getTagsInUse(StateNamespace namespace) {
     return stateTable.row(namespace);
   }
 
@@ -80,5 +80,5 @@ public abstract class StateTable<K> {
    * Provide the {@code StateBinder} to use for creating {@code Storage} instances
    * in the specified {@code namespace}.
    */
-  protected abstract StateBinder<K> binderForNamespace(StateNamespace namespace, StateContext<?> c);
+  protected abstract StateBinder binderForNamespace(StateNamespace namespace, StateContext<?> c);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index aaeecf0..38e9dea 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -46,12 +45,10 @@ import org.apache.beam.sdk.util.state.WatermarkHoldState;
  *
  * <p>Currently, this can only be used in a step immediately following a {@link GroupByKey}.
  *
- * @param <K> The type of key that must be used with the state tag. Contravariant: methods should
- *            accept values of type {@code KeyedStateTag<? super K, StateT>}.
  * @param <StateT> The type of state being tagged.
  */
 @Experimental(Kind.STATE)
-public interface StateTag<K, StateT extends State> extends Serializable {
+public interface StateTag<StateT extends State> extends Serializable {
 
   /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */
   void appendTo(Appendable sb) throws IOException;
@@ -64,7 +61,7 @@ public interface StateTag<K, StateT extends State> extends Serializable {
   /**
    * The specification for the state stored in the referenced cell.
    */
-  StateSpec<K, StateT> getSpec();
+  StateSpec<StateT> getSpec();
 
   /**
    * Bind this state tag. See {@link StateSpec#bind}.
@@ -72,35 +69,34 @@ public interface StateTag<K, StateT extends State> extends Serializable {
    * @deprecated Use the {@link StateSpec#bind} method via {@link #getSpec} for now.
    */
   @Deprecated
-  StateT bind(StateBinder<? extends K> binder);
+  StateT bind(StateBinder binder);
 
   /**
    * Visitor for binding a {@link StateSpec} and to the associated {@link State}.
    *
-   * @param <K> the type of key this binder embodies.
    * @deprecated for migration only; runners should reference the top level {@link StateBinder}
    * and move towards {@link StateSpec} rather than {@link StateTag}.
    */
   @Deprecated
-  public interface StateBinder<K> {
-    <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> spec, Coder<T> coder);
+  public interface StateBinder {
+    <T> ValueState<T> bindValue(StateTag<ValueState<T>> spec, Coder<T> coder);
 
-    <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> spec, Coder<T> elemCoder);
+    <T> BagState<T> bindBag(StateTag<BagState<T>> spec, Coder<T> elemCoder);
 
-    <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> elemCoder);
+    <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder);
 
     <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-        StateTag<? super K, MapState<KeyT, ValueT>> spec,
+        StateTag<MapState<KeyT, ValueT>> spec,
         Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
 
     <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> spec,
         Coder<AccumT> accumCoder,
         CombineFn<InputT, AccumT, OutputT> combineFn);
 
     <InputT, AccumT, OutputT>
         CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> spec,
             Coder<AccumT> accumCoder,
             CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
 
@@ -110,7 +106,7 @@ public interface StateTag<K, StateT extends State> extends Serializable {
      * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps
      * added to the returned {@link WatermarkHoldState} are to be combined.
      */
-    <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-        StateTag<? super K, WatermarkHoldState> spec, TimestampCombiner timestampCombiner);
+    WatermarkHoldState bindWatermark(
+        StateTag<WatermarkHoldState> spec, TimestampCombiner timestampCombiner);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index fe99f27..ca8b238 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -50,29 +50,29 @@ public class StateTags {
 
   /** @deprecated for migration purposes only */
   @Deprecated
-  private static <K> StateBinder<K> adaptTagBinder(final StateTag.StateBinder<K> binder) {
-    return new StateBinder<K>() {
+  private static StateBinder adaptTagBinder(final StateTag.StateBinder binder) {
+    return new StateBinder() {
       @Override
       public <T> ValueState<T> bindValue(
-          String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) {
+          String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
         return binder.bindValue(tagForSpec(id, spec), coder);
       }
 
       @Override
       public <T> BagState<T> bindBag(
-          String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder) {
+          String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
         return binder.bindBag(tagForSpec(id, spec), elemCoder);
       }
 
       @Override
       public <T> SetState<T> bindSet(
-          String id, StateSpec<? super K, SetState<T>> spec, Coder<T> elemCoder) {
+          String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
         return binder.bindSet(tagForSpec(id, spec), elemCoder);
       }
 
       @Override
       public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-          String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
+          String id, StateSpec<MapState<KeyT, ValueT>> spec,
           Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
         return binder.bindMap(tagForSpec(id, spec), mapKeyCoder, mapValueCoder);
       }
@@ -81,7 +81,7 @@ public class StateTags {
       public <InputT, AccumT, OutputT>
       CombiningState<InputT, AccumT, OutputT> bindCombining(
               String id,
-              StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+              StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
               CombineFn<InputT, AccumT, OutputT> combineFn) {
         return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, combineFn);
@@ -91,7 +91,7 @@ public class StateTags {
       public <InputT, AccumT, OutputT>
       CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
               String id,
-              StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+              StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
               CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
         return binder.bindCombiningValueWithContext(
@@ -99,9 +99,9 @@ public class StateTags {
       }
 
       @Override
-      public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+      public WatermarkHoldState bindWatermark(
           String id,
-          StateSpec<? super K, WatermarkHoldState> spec,
+          StateSpec<WatermarkHoldState> spec,
           TimestampCombiner timestampCombiner) {
         return binder.bindWatermark(tagForSpec(id, spec), timestampCombiner);
       }
@@ -121,20 +121,20 @@ public class StateTags {
 
   private StateTags() { }
 
-  private interface SystemStateTag<K, StateT extends State> {
-    StateTag<K, StateT> asKind(StateKind kind);
+  private interface SystemStateTag<StateT extends State> {
+    StateTag<StateT> asKind(StateKind kind);
   }
 
   /** Create a state tag for the given id and spec. */
-  public static <K, StateT extends State> StateTag<K, StateT> tagForSpec(
-      String id, StateSpec<K, StateT> spec) {
+  public static <StateT extends State> StateTag<StateT> tagForSpec(
+      String id, StateSpec<StateT> spec) {
     return new SimpleStateTag<>(new StructuredId(id), spec);
   }
 
   /**
    * Create a simple state tag for values of type {@code T}.
    */
-  public static <T> StateTag<Object, ValueState<T>> value(String id, Coder<T> valueCoder) {
+  public static <T> StateTag<ValueState<T>> value(String id, Coder<T> valueCoder) {
     return new SimpleStateTag<>(new StructuredId(id), StateSpecs.value(valueCoder));
   }
 
@@ -143,7 +143,7 @@ public class StateTags {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-    StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
+    StateTag<CombiningState<InputT, AccumT, OutputT>>
     combiningValue(
       String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
@@ -155,7 +155,7 @@ public class StateTags {
    * merge multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-      StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
+      StateTag<CombiningState<InputT, AccumT, OutputT>>
       combiningValueWithContext(
           String id,
           Coder<AccumT> accumCoder,
@@ -172,7 +172,7 @@ public class StateTags {
    * should only be used to initialize static values.
    */
   public static <InputT, AccumT, OutputT>
-      StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
+      StateTag<CombiningState<InputT, AccumT, OutputT>>
       combiningValueFromInputInternal(
           String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
@@ -183,21 +183,21 @@ public class StateTags {
    * Create a state tag that is optimized for adding values frequently, and
    * occasionally retrieving all the values that have been added.
    */
-  public static <T> StateTag<Object, BagState<T>> bag(String id, Coder<T> elemCoder) {
+  public static <T> StateTag<BagState<T>> bag(String id, Coder<T> elemCoder) {
     return new SimpleStateTag<>(new StructuredId(id), StateSpecs.bag(elemCoder));
   }
 
   /**
    * Create a state spec that supporting for {@link java.util.Set} like access patterns.
    */
-  public static <T> StateTag<Object, SetState<T>> set(String id, Coder<T> elemCoder) {
+  public static <T> StateTag<SetState<T>> set(String id, Coder<T> elemCoder) {
     return new SimpleStateTag<>(new StructuredId(id), StateSpecs.set(elemCoder));
   }
 
   /**
    * Create a state spec that supporting for {@link java.util.Map} like access patterns.
    */
-  public static <K, V> StateTag<Object, MapState<K, V>> map(
+  public static <K, V> StateTag<MapState<K, V>> map(
       String id, Coder<K> keyCoder, Coder<V> valueCoder) {
     return new SimpleStateTag<>(new StructuredId(id), StateSpecs.map(keyCoder, valueCoder));
   }
@@ -205,7 +205,7 @@ public class StateTags {
   /**
    * Create a state tag for holding the watermark.
    */
-  public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState>
+  public static <W extends BoundedWindow> StateTag<WatermarkHoldState>
       watermarkStateInternal(String id, TimestampCombiner timestampCombiner) {
     return new SimpleStateTag<>(
         new StructuredId(id), StateSpecs.watermarkStateInternal(timestampCombiner));
@@ -215,20 +215,20 @@ public class StateTags {
    * Convert an arbitrary {@link StateTag} to a system-internal tag that is guaranteed not to
    * collide with any user tags.
    */
-  public static <K, StateT extends State> StateTag<K, StateT> makeSystemTagInternal(
-      StateTag<K, StateT> tag) {
+  public static <StateT extends State> StateTag<StateT> makeSystemTagInternal(
+      StateTag<StateT> tag) {
     if (!(tag instanceof SystemStateTag)) {
       throw new IllegalArgumentException("Expected subclass of SimpleStateTag, got " + tag);
     }
     // Checked above
     @SuppressWarnings("unchecked")
-    SystemStateTag<K, StateT> typedTag = (SystemStateTag<K, StateT>) tag;
+    SystemStateTag<StateT> typedTag = (SystemStateTag<StateT>) tag;
     return typedTag.asKind(StateKind.SYSTEM);
   }
 
-  public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>>
+  public static <InputT, AccumT, OutputT> StateTag<BagState<AccumT>>
       convertToBagTagInternal(
-          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> combiningTag) {
+          StateTag<CombiningState<InputT, AccumT, OutputT>> combiningTag) {
     return new SimpleStateTag<>(
         new StructuredId(combiningTag.getId()),
         StateSpecs.convertToBagSpecInternal(combiningTag.getSpec()));
@@ -291,20 +291,20 @@ public class StateTags {
   /**
    * A basic {@link StateTag} implementation that manages the structured ids.
    */
-  private static class SimpleStateTag<K, StateT extends State>
-      implements StateTag<K, StateT>, SystemStateTag<K, StateT> {
+  private static class SimpleStateTag<StateT extends State>
+      implements StateTag<StateT>, SystemStateTag<StateT> {
 
-    private final StateSpec<K, StateT> spec;
+    private final StateSpec<StateT> spec;
     private final StructuredId id;
 
-    public SimpleStateTag(StructuredId id, StateSpec<K, StateT> spec) {
+    public SimpleStateTag(StructuredId id, StateSpec<StateT> spec) {
       this.id = id;
       this.spec = spec;
     }
 
     @Override
     @Deprecated
-    public StateT bind(StateTag.StateBinder<? extends K> binder) {
+    public StateT bind(StateTag.StateBinder binder) {
       return spec.bind(
           this.id.getRawId(), adaptTagBinder(binder));
     }
@@ -315,7 +315,7 @@ public class StateTags {
     }
 
     @Override
-    public StateSpec<K, StateT> getSpec() {
+    public StateSpec<StateT> getSpec() {
       return spec;
     }
 
@@ -332,7 +332,7 @@ public class StateTags {
     }
 
     @Override
-    public StateTag<K, StateT> asKind(StateKind kind) {
+    public StateTag<StateT> asKind(StateKind kind) {
       return new SimpleStateTag<>(id.asKind(kind), spec);
     }
 
@@ -342,7 +342,7 @@ public class StateTags {
         return false;
       }
 
-      SimpleStateTag<?, ?> otherTag = (SimpleStateTag<?, ?>) other;
+      SimpleStateTag<?> otherTag = (SimpleStateTag<?>) other;
       return Objects.equals(this.getId(), otherTag.getId())
           && Objects.equals(this.getSpec(), otherTag.getSpec());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 7a20590..e3717a8 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -236,12 +236,12 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
 
     private final DoFn<?, ?> fn;
     private final DoFnSignature signature;
-    private final StateInternals<?> stateInternals;
+    private final StateInternals stateInternals;
     private final Coder<W> windowCoder;
 
     public StateInternalsStateCleaner(
         DoFn<?, ?> fn,
-        StateInternals<?> stateInternals,
+        StateInternals stateInternals,
         Coder<W> windowCoder) {
       this.fn = fn;
       this.signature = DoFnSignatures.getSignature(fn.getClass());
@@ -254,7 +254,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
       for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
           signature.stateDeclarations().entrySet()) {
         try {
-          StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
+          StateSpec<?> spec = (StateSpec<?>) entry.getValue().field().get(fn);
           State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
               StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
           state.clear();

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index 86a7fd7..f18460a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -47,7 +47,7 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
    */
   public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>
       buffering(final Coder<T> inputCoder) {
-    final StateTag<Object, BagState<T>> bufferTag =
+    final StateTag<BagState<T>> bufferTag =
         StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, inputCoder));
     return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag) {
       @Override
@@ -70,7 +70,7 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
       AccumT, OutputT, W>
       combining(
           final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    final StateTag<Object, CombiningState<InputT, AccumT, OutputT>> bufferTag;
+    final StateTag<CombiningState<InputT, AccumT, OutputT>> bufferTag;
     if (combineFn.getFn() instanceof CombineFnWithContext) {
       bufferTag = StateTags.makeSystemTagInternal(
           StateTags.<InputT, AccumT, OutputT>combiningValueWithContext(
@@ -96,10 +96,10 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
     };
   }
 
-  private StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag;
+  private StateTag<? extends GroupingState<InputT, OutputT>> bufferTag;
 
   public SystemReduceFn(
-      StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag) {
+      StateTag<? extends GroupingState<InputT, OutputT>> bufferTag) {
     this.bufferTag = bufferTag;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
index 1dfb85f..18b50db 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
@@ -32,9 +32,9 @@ public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> {
     super(key);
   }
 
-  public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) {
-    Set<StateTag<? super K, ?>> inUse = new HashSet<>();
-    for (Map.Entry<StateTag<? super K, ?>, State> entry :
+  public Set<StateTag<?>> getTagsInUse(StateNamespace namespace) {
+    Set<StateTag<?>> inUse = new HashSet<>();
+    for (Map.Entry<StateTag<?>, State> entry :
       inMemoryState.getTagsInUse(namespace).entrySet()) {
       if (!isEmptyForTesting(entry.getValue())) {
         inUse.add(entry.getKey());

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 9bb9c62..e6e4ffb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -54,9 +54,9 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
    * used for elements.
    */
   public static <W extends BoundedWindow>
-      StateTag<Object, WatermarkHoldState> watermarkHoldTagForTimestampCombiner(
+      StateTag<WatermarkHoldState> watermarkHoldTagForTimestampCombiner(
           TimestampCombiner timestampCombiner) {
-    return StateTags.<Object, WatermarkHoldState>makeSystemTagInternal(
+    return StateTags.<WatermarkHoldState>makeSystemTagInternal(
         StateTags.<W>watermarkStateInternal("hold", timestampCombiner));
   }
 
@@ -67,13 +67,13 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
    * would take the end-of-window time as its element time.)
    */
   @VisibleForTesting
-  public static final StateTag<Object, WatermarkHoldState> EXTRA_HOLD_TAG =
+  public static final StateTag<WatermarkHoldState> EXTRA_HOLD_TAG =
       StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal(
           "extra", TimestampCombiner.EARLIEST));
 
   private final TimerInternals timerInternals;
   private final WindowingStrategy<?, W> windowingStrategy;
-  private final StateTag<Object, WatermarkHoldState> elementHoldTag;
+  private final StateTag<WatermarkHoldState> elementHoldTag;
 
   public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
     this.timerInternals = timerInternals;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
index 5005065..a824a7b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
@@ -40,7 +40,7 @@ public interface WindowingInternals<InputT, OutputT> {
    * Unsupported state internals. The key type is unknown. It is up to the user to use the
    * correct type of key.
    */
-  StateInternals<?> stateInternals();
+  StateInternals stateInternals();
 
   /**
    * Output the value at the specified timestamp in the listed windows.

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index b416788..ed2c26f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -55,7 +55,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
   protected static final List<SerializableFunction<Instant, Instant>> IDENTITY =
       ImmutableList.<SerializableFunction<Instant, Instant>>of();
 
-  protected static final StateTag<Object, CombiningState<Instant,
+  protected static final StateTag<CombiningState<Instant,
                                                 Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
       StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
           "delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index 11323cc..52fb5ff 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.state.CombiningState;
 @Experimental(Experimental.Kind.TRIGGER)
 public class AfterPaneStateMachine extends OnceTriggerStateMachine {
 
-private static final StateTag<Object, CombiningState<Long, long[], Long>>
+private static final StateTag<CombiningState<Long, long[], Long>>
       ELEMENTS_IN_PANE_TAG =
       StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
           "count", VarLongCoder.of(), Sum.ofLongs()));


[3/6] beam git commit: Simplify type parameters of StateSpec and related

Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index 31e931c..cfe3f9b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CombineContextFactory;
 import org.apache.beam.sdk.util.state.BagState;
@@ -62,7 +61,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
  * <p>Note: Ignore index of key.
  * Mainly for SideInputs.
  */
-public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
+public class FlinkBroadcastStateInternals<K> implements StateInternals {
 
   private int indexInSubtaskGroup;
   private final DefaultOperatorStateBackend stateBackend;
@@ -86,7 +85,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address) {
+      StateTag<T> address) {
 
     return state(namespace, address, StateContexts.nullContext());
   }
@@ -94,36 +93,36 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address,
+      StateTag<T> address,
       final StateContext<?> context) {
 
     return address.bind(
-        new StateTag.StateBinder<K>() {
+        new StateTag.StateBinder() {
 
           @Override
           public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+              StateTag<ValueState<T>> address, Coder<T> coder) {
 
             return new FlinkBroadcastValueState<>(stateBackend, address, namespace, coder);
           }
 
           @Override
           public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+              StateTag<BagState<T>> address, Coder<T> elemCoder) {
 
             return new FlinkBroadcastBagState<>(stateBackend, address, namespace, elemCoder);
           }
 
           @Override
           public <T> SetState<T> bindSet(
-              StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+              StateTag<SetState<T>> address, Coder<T> elemCoder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", SetState.class.getSimpleName()));
           }
 
           @Override
           public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<? super K, MapState<KeyT, ValueT>> spec,
+              StateTag<MapState<KeyT, ValueT>> spec,
               Coder<KeyT> mapKeyCoder,
               Coder<ValueT> mapValueCoder) {
             throw new UnsupportedOperationException(
@@ -133,7 +132,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
 
@@ -144,7 +143,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
             return new FlinkCombiningStateWithContext<>(
@@ -158,8 +157,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
           }
 
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
+          public WatermarkHoldState bindWatermark(
+              StateTag<WatermarkHoldState> address,
               TimestampCombiner timestampCombiner) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
@@ -302,11 +301,11 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
       extends AbstractBroadcastState<T> implements ValueState<T> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, ValueState<T>> address;
+    private final StateTag<ValueState<T>> address;
 
     FlinkBroadcastValueState(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, ValueState<T>> address,
+        StateTag<ValueState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
       super(flinkStateBackend, address.getId(), namespace, coder);
@@ -363,11 +362,11 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
       implements BagState<T> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, BagState<T>> address;
+    private final StateTag<BagState<T>> address;
 
     FlinkBroadcastBagState(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, BagState<T>> address,
+        StateTag<BagState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
       super(flinkStateBackend, address.getId(), namespace, ListCoder.of(coder));
@@ -451,12 +450,12 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
 
     FlinkCombiningState(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder) {
@@ -568,13 +567,13 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
     private final FlinkBroadcastStateInternals<K> flinkStateInternals;
 
     FlinkKeyedCombiningState(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
@@ -704,14 +703,14 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
     private final FlinkBroadcastStateInternals<K> flinkStateInternals;
     private final CombineWithContext.Context context;
 
     FlinkCombiningStateWithContext(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index 67d7966..c9b7797 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.state.BagState;
@@ -67,7 +66,7 @@ import org.apache.flink.util.Preconditions;
  *
  * <p>Reference from {@link HeapInternalTimerService} to the local key-group range.
  */
-public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
+public class FlinkKeyGroupStateInternals<K> implements StateInternals {
 
   private final Coder<K> keyCoder;
   private final KeyGroupsList localKeyGroupRange;
@@ -109,7 +108,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address) {
+      StateTag<T> address) {
 
     return state(namespace, address, StateContexts.nullContext());
   }
@@ -117,36 +116,36 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address,
+      StateTag<T> address,
       final StateContext<?> context) {
 
     return address.bind(
-        new StateTag.StateBinder<K>() {
+        new StateTag.StateBinder() {
 
           @Override
           public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+              StateTag<ValueState<T>> address, Coder<T> coder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", ValueState.class.getSimpleName()));
           }
 
           @Override
           public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+              StateTag<BagState<T>> address, Coder<T> elemCoder) {
 
             return new FlinkKeyGroupBagState<>(address, namespace, elemCoder);
           }
 
           @Override
           public <T> SetState<T> bindSet(
-              StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+              StateTag<SetState<T>> address, Coder<T> elemCoder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", SetState.class.getSimpleName()));
           }
 
           @Override
           public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<? super K, MapState<KeyT, ValueT>> spec,
+              StateTag<MapState<KeyT, ValueT>> spec,
               Coder<KeyT> mapKeyCoder,
               Coder<ValueT> mapValueCoder) {
             throw new UnsupportedOperationException(
@@ -156,7 +155,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
             throw new UnsupportedOperationException("bindCombiningValue is not supported.");
@@ -165,7 +164,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
             throw new UnsupportedOperationException(
@@ -173,8 +172,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
           }
 
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
+          public WatermarkHoldState bindWatermark(
+              StateTag<WatermarkHoldState> address,
               TimestampCombiner timestampCombiner) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", CombiningState.class.getSimpleName()));
@@ -334,10 +333,10 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
       implements BagState<T> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, BagState<T>> address;
+    private final StateTag<BagState<T>> address;
 
     FlinkKeyGroupBagState(
-        StateTag<? super K, BagState<T>> address,
+        StateTag<BagState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
       super(address.getId(), namespace.stringKey(), ListCoder.of(coder),

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index ef6c3b2..3d38f88 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -26,7 +26,6 @@ import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -53,7 +52,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
  *  Ignore index of key and namespace.
  *  Just implement BagState.
  */
-public class FlinkSplitStateInternals<K> implements StateInternals<K> {
+public class FlinkSplitStateInternals<K> implements StateInternals {
 
   private final OperatorStateBackend stateBackend;
 
@@ -69,7 +68,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address) {
+      StateTag<T> address) {
 
     return state(namespace, address, StateContexts.nullContext());
   }
@@ -77,36 +76,36 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address,
+      StateTag<T> address,
       final StateContext<?> context) {
 
     return address.bind(
-        new StateTag.StateBinder<K>() {
+        new StateTag.StateBinder() {
 
           @Override
           public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+              StateTag<ValueState<T>> address, Coder<T> coder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", ValueState.class.getSimpleName()));
           }
 
           @Override
           public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+              StateTag<BagState<T>> address, Coder<T> elemCoder) {
 
             return new FlinkSplitBagState<>(stateBackend, address, namespace, elemCoder);
           }
 
           @Override
           public <T> SetState<T> bindSet(
-              StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+              StateTag<SetState<T>> address, Coder<T> elemCoder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", SetState.class.getSimpleName()));
           }
 
           @Override
           public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<? super K, MapState<KeyT, ValueT>> spec,
+              StateTag<MapState<KeyT, ValueT>> spec,
               Coder<KeyT> mapKeyCoder,
               Coder<ValueT> mapValueCoder) {
             throw new UnsupportedOperationException(
@@ -116,7 +115,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
             throw new UnsupportedOperationException("bindCombiningValue is not supported.");
@@ -125,7 +124,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
             throw new UnsupportedOperationException(
@@ -133,8 +132,8 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
           }
 
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
+          public WatermarkHoldState bindWatermark(
+              StateTag<WatermarkHoldState> address,
               TimestampCombiner timestampCombiner) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", CombiningState.class.getSimpleName()));
@@ -147,11 +146,11 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
     private final ListStateDescriptor<T> descriptor;
     private OperatorStateBackend flinkStateBackend;
     private final StateNamespace namespace;
-    private final StateTag<? super K, BagState<T>> address;
+    private final StateTag<BagState<T>> address;
 
     FlinkSplitBagState(
         OperatorStateBackend flinkStateBackend,
-        StateTag<? super K, BagState<T>> address,
+        StateTag<BagState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
       this.flinkStateBackend = flinkStateBackend;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index c99d085..c033be6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -57,7 +57,7 @@ import org.joda.time.Instant;
  * <p>Note: In the Flink streaming runner the key is always encoded
  * using an {@link Coder} and stored in a {@link ByteBuffer}.
  */
-public class FlinkStateInternals<K> implements StateInternals<K> {
+public class FlinkStateInternals<K> implements StateInternals {
 
   private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
   private Coder<K> keyCoder;
@@ -95,7 +95,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address) {
+      StateTag<T> address) {
 
     return state(namespace, address, StateContexts.nullContext());
   }
@@ -103,36 +103,36 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address,
+      StateTag<T> address,
       final StateContext<?> context) {
 
     return address.bind(
-        new StateTag.StateBinder<K>() {
+        new StateTag.StateBinder() {
 
           @Override
           public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+              StateTag<ValueState<T>> address, Coder<T> coder) {
 
             return new FlinkValueState<>(flinkStateBackend, address, namespace, coder);
           }
 
           @Override
           public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+              StateTag<BagState<T>> address, Coder<T> elemCoder) {
 
             return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder);
           }
 
           @Override
           public <T> SetState<T> bindSet(
-              StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+              StateTag<SetState<T>> address, Coder<T> elemCoder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", SetState.class.getSimpleName()));
           }
 
           @Override
           public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<? super K, MapState<KeyT, ValueT>> spec,
+              StateTag<MapState<KeyT, ValueT>> spec,
               Coder<KeyT> mapKeyCoder,
               Coder<ValueT> mapValueCoder) {
             throw new UnsupportedOperationException(
@@ -142,7 +142,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
 
@@ -153,7 +153,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
             return new FlinkCombiningStateWithContext<>(
@@ -167,8 +167,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
           }
 
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
+          public WatermarkHoldState bindWatermark(
+              StateTag<WatermarkHoldState> address,
               TimestampCombiner timestampCombiner) {
 
             return new FlinkWatermarkHoldState<>(
@@ -180,13 +180,13 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
   private static class FlinkValueState<K, T> implements ValueState<T> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, ValueState<T>> address;
+    private final StateTag<ValueState<T>> address;
     private final ValueStateDescriptor<T> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
 
     FlinkValueState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, ValueState<T>> address,
+        StateTag<ValueState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
 
@@ -266,13 +266,13 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
   private static class FlinkBagState<K, T> implements BagState<T> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, BagState<T>> address;
+    private final StateTag<BagState<T>> address;
     private final ListStateDescriptor<T> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
 
     FlinkBagState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, BagState<T>> address,
+        StateTag<BagState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
 
@@ -379,14 +379,14 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
 
     FlinkCombiningState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder) {
@@ -547,7 +547,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
@@ -555,7 +555,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
 
     FlinkKeyedCombiningState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
@@ -718,7 +718,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
@@ -727,7 +727,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
 
     FlinkCombiningStateWithContext(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
@@ -886,7 +886,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
 
   private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
       implements WatermarkHoldState {
-    private final StateTag<? super K, WatermarkHoldState> address;
+    private final StateTag<WatermarkHoldState> address;
     private final TimestampCombiner timestampCombiner;
     private final StateNamespace namespace;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
@@ -896,7 +896,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     public FlinkWatermarkHoldState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
         FlinkStateInternals<K> flinkStateInternals,
-        StateTag<? super K, WatermarkHoldState> address,
+        StateTag<WatermarkHoldState> address,
         StateNamespace namespace,
         TimestampCombiner timestampCombiner) {
       this.address = address;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 4e18ac2..bda30e4 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -196,7 +196,7 @@ public class DoFnOperatorTest {
     DoFn<Integer, String> fn = new DoFn<Integer, String>() {
 
       @StateId("state")
-      private final StateSpec<Object, ValueState<String>> stateSpec =
+      private final StateSpec<ValueState<String>> stateSpec =
           StateSpecs.value(StringUtf8Coder.of());
 
       @ProcessElement
@@ -296,7 +296,7 @@ public class DoFnOperatorTest {
           private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<String>> stateSpec =
+          private final StateSpec<ValueState<String>> stateSpec =
               StateSpecs.value(StringUtf8Coder.of());
 
           @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index 7e7d1e1..eb2c05f 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -56,12 +56,12 @@ public class FlinkBroadcastStateInternalsTest {
   private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
   private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
 
-  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+  private static final StateTag<CombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
 
   FlinkBroadcastStateInternals<String> underTest;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
index 5433d07..0e0267b 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
@@ -64,7 +64,7 @@ public class FlinkKeyGroupStateInternalsTest {
   private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
   private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
 
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
 
   FlinkKeyGroupStateInternals<String> underTest;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
index 08ae0c4..8033a9d 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
@@ -47,7 +47,7 @@ public class FlinkSplitStateInternalsTest {
   private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
   private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
 
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
 
   FlinkSplitStateInternals<String> underTest;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 17c43bf..cd00d9e 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -70,18 +70,18 @@ public class FlinkStateInternalsTest {
   private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
   private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
 
-  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+  private static final StateTag<CombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
 
   FlinkStateInternals<String> underTest;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
index ce7f678..38129ab 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -103,7 +103,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
   private static class DummyStatefulDoFn extends DoFn<KV<Integer, Integer>, Integer> {
 
     @StateId("foo")
-    private final StateSpec<Object, ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of());
+    private final StateSpec<ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of());
 
     @ProcessElement
     public void processElem(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 343d51b..63e1166 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -890,7 +890,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
             ParDo.of(
                 new DoFn<KV<Integer, Integer>, Integer>() {
                   @StateId("unused")
-                  final StateSpec<Object, ValueState<Integer>> stateSpec =
+                  final StateSpec<ValueState<Integer>> stateSpec =
                       StateSpecs.value(VarIntCoder.of());
 
                   @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index cdc23ff..afaba3a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.state.BagState;
@@ -51,7 +50,7 @@ import org.joda.time.Instant;
 /**
  * An implementation of {@link StateInternals} for the SparkRunner.
  */
-class SparkStateInternals<K> implements StateInternals<K> {
+class SparkStateInternals<K> implements StateInternals {
 
   private final K key;
   //Serializable state for internals (namespace to state tag to coded value).
@@ -86,50 +85,47 @@ class SparkStateInternals<K> implements StateInternals<K> {
   }
 
   @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+  public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
     return state(namespace, address, StateContexts.nullContext());
   }
 
   @Override
   public <T extends State> T state(
       StateNamespace namespace,
-      StateTag<? super K, T> address,
+      StateTag<T> address,
       StateContext<?> c) {
-    return address.bind(new SparkStateBinder(key, namespace, c));
+    return address.bind(new SparkStateBinder(namespace, c));
   }
 
-  private class SparkStateBinder implements StateBinder<K> {
-    private final K key;
+  private class SparkStateBinder implements StateBinder {
     private final StateNamespace namespace;
     private final StateContext<?> c;
 
-    private SparkStateBinder(K key,
-                             StateNamespace namespace,
+    private SparkStateBinder(StateNamespace namespace,
                              StateContext<?> c) {
-      this.key = key;
       this.namespace = namespace;
       this.c = c;
     }
 
     @Override
-    public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+    public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
       return new SparkValueState<>(namespace, address, coder);
     }
 
     @Override
-    public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+    public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
       return new SparkBagState<>(namespace, address, elemCoder);
     }
 
     @Override
-    public <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> elemCoder) {
+    public <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder) {
       throw new UnsupportedOperationException(
           String.format("%s is not supported", SetState.class.getSimpleName()));
     }
 
     @Override
     public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-        StateTag<? super K, MapState<KeyT, ValueT>> spec,
+        StateTag<MapState<KeyT, ValueT>> spec,
         Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
       throw new UnsupportedOperationException(
           String.format("%s is not supported", MapState.class.getSimpleName()));
@@ -138,7 +134,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValue(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             CombineFn<InputT, AccumT, OutputT> combineFn) {
       return new SparkCombiningState<>(namespace, address, accumCoder, combineFn);
@@ -147,7 +143,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValueWithContext(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
       return new SparkCombiningState<>(
@@ -155,8 +151,8 @@ class SparkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-        StateTag<? super K, WatermarkHoldState> address,
+    public WatermarkHoldState bindWatermark(
+        StateTag<WatermarkHoldState> address,
         TimestampCombiner timestampCombiner) {
       return new SparkWatermarkHoldState(namespace, address, timestampCombiner);
     }
@@ -164,12 +160,12 @@ class SparkStateInternals<K> implements StateInternals<K> {
 
   private class AbstractState<T> {
     final StateNamespace namespace;
-    final StateTag<?, ? extends State> address;
+    final StateTag<? extends State> address;
     final Coder<T> coder;
 
     private AbstractState(
         StateNamespace namespace,
-        StateTag<?, ? extends State> address,
+        StateTag<? extends State> address,
         Coder<T> coder) {
       this.namespace = namespace;
       this.address = address;
@@ -218,7 +214,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
 
     private SparkValueState(
             StateNamespace namespace,
-            StateTag<?, ValueState<T>> address,
+            StateTag<ValueState<T>> address,
             Coder<T> coder) {
       super(namespace, address, coder);
     }
@@ -246,7 +242,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
 
     public SparkWatermarkHoldState(
         StateNamespace namespace,
-        StateTag<?, WatermarkHoldState> address,
+        StateTag<WatermarkHoldState> address,
         TimestampCombiner timestampCombiner) {
       super(namespace, address, InstantCoder.of());
       this.timestampCombiner = timestampCombiner;
@@ -300,7 +296,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
 
     private SparkCombiningState(
         StateNamespace namespace,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Coder<AccumT> coder,
         CombineFn<InputT, AccumT, OutputT> combineFn) {
       super(namespace, address, coder);
@@ -363,7 +359,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
   private final class SparkBagState<T> extends AbstractState<List<T>> implements BagState<T> {
     private SparkBagState(
         StateNamespace namespace,
-        StateTag<?, BagState<T>> address,
+        StateTag<BagState<T>> address,
         Coder<T> coder) {
       super(namespace, address, ListCoder.of(coder));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index 0a00c45..063feef 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -93,7 +93,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
     InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
     timerInternals.advanceProcessingTime(Instant.now());
     timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-    StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+    StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
     GABWOutputWindowedValue<K, InputT> outputter = new GABWOutputWindowedValue<>();
 
     ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner =

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 3e8dde5..ffe343b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -124,7 +124,7 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
         Coder<W> windowCoder) throws IOException { }
 
     @Override
-    public StateInternals<?> stateInternals() {
+    public StateInternals stateInternals() {
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index ef1ff9f..7b6f9ed 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -65,7 +65,7 @@ public final class TranslationUtils {
    */
   static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>, Serializable {
     @Override
-    public StateInternals<K> stateInternalsForKey(K key) {
+    public StateInternals stateInternalsForKey(K key) {
       return InMemoryStateInternals.forKey(key);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 9b99ca4..0368476 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -331,10 +331,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   /**
    * Annotation for declaring and dereferencing state cells.
    *
-   * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a {@link
-   * StateId}. To use the cell during processing, add a parameter of the appropriate {@link State}
-   * subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} method, and
-   * annotate it with {@link StateId}. See the following code for an example:
+   * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a
+   * {@link StateId}. To use the cell during processing, add a parameter of the appropriate {@link
+   * State} subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer}
+   * method, and annotate it with {@link StateId}. See the following code for an example:
    *
    * <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} {
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index b5547e3..02f3a85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -112,14 +112,14 @@ public class GroupIntoBatches<K, InputT>
     private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
     @StateId(BATCH_ID)
-    private final StateSpec<Object, BagState<InputT>> batchSpec;
+    private final StateSpec<BagState<InputT>> batchSpec;
 
     @StateId(NUM_ELEMENTS_IN_BATCH_ID)
-    private final StateSpec<Object, CombiningState<Long, Long, Long>>
+    private final StateSpec<CombiningState<Long, Long, Long>>
         numElementsInBatchSpec;
 
     @StateId(KEY_ID)
-    private final StateSpec<Object, ValueState<K>> keySpec;
+    private final StateSpec<ValueState<K>> keySpec;
 
     private final long prefetchFrequency;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 1f6afbf..6137a7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -446,7 +446,7 @@ public class ParDo {
     Map<String, DoFnSignature.StateDeclaration> stateDeclarations = signature.stateDeclarations();
     for (DoFnSignature.StateDeclaration stateDeclaration : stateDeclarations.values()) {
       try {
-        StateSpec<?, ?> stateSpec = (StateSpec<?, ?>) stateDeclaration.field().get(fn);
+        StateSpec<?> stateSpec = (StateSpec<?>) stateDeclaration.field().get(fn);
         stateSpec.offerCoders(codersForStateSpecTypes(stateDeclaration, coderRegistry, inputCoder));
         stateSpec.finishSpecifying();
       } catch (IllegalAccessException e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index 6fe37a1..48fa742 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -20,34 +20,36 @@ package org.apache.beam.sdk.util.state;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 
 /**
  * Visitor for binding a {@link StateSpec} and to the associated {@link State}.
- *
- * @param <K> the type of key this binder embodies.
  */
-public interface StateBinder<K> {
-  <T> ValueState<T> bindValue(String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder);
+public interface StateBinder {
+  <T> ValueState<T> bindValue(
+      String id, StateSpec<ValueState<T>> spec, Coder<T> coder);
 
-  <T> BagState<T> bindBag(String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder);
+  <T> BagState<T> bindBag(
+      String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder);
 
-  <T> SetState<T> bindSet(String id, StateSpec<? super K, SetState<T>> spec, Coder<T> elemCoder);
+  <T> SetState<T> bindSet(
+      String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder);
 
   <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-      String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
-      Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
+      String id,
+      StateSpec<MapState<KeyT, ValueT>> spec,
+      Coder<KeyT> mapKeyCoder,
+      Coder<ValueT> mapValueCoder);
 
   <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(
       String id,
-      StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+      StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
       Coder<AccumT> accumCoder,
       Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
 
   <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
       String id,
-      StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+      StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
       Coder<AccumT> accumCoder,
       CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
 
@@ -57,8 +59,8 @@ public interface StateBinder<K> {
    * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added
    * to the returned {@link WatermarkHoldState} are to be combined.
    */
-  <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+  WatermarkHoldState bindWatermark(
       String id,
-      StateSpec<? super K, WatermarkHoldState> spec,
+      StateSpec<WatermarkHoldState> spec,
       TimestampCombiner timestampCombiner);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
index 6b94c40..8eda218 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
@@ -26,23 +26,22 @@ import org.apache.beam.sdk.coders.Coder;
  * A specification of a persistent state cell. This includes information necessary to encode the
  * value and details about the intended access pattern.
  *
- * @param <K> The type of key that must be used with the state tag. Contravariant: methods should
- *            accept values of type {@code StateSpec<? super K, StateT>}.
  * @param <StateT> The type of state being described.
  */
 @Experimental(Kind.STATE)
-public interface StateSpec<K, StateT extends State> extends Serializable {
+public interface StateSpec<StateT extends State> extends Serializable {
 
   /**
    * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address.
    */
-  StateT bind(String id, StateBinder<? extends K> binder);
+  StateT bind(String id, StateBinder binder);
 
   /**
-   * Given {code coders} are inferred from type arguments defined for this class.
-   * Coders which are already set should take precedence over offered coders.
-   * @param coders Array of coders indexed by the type arguments order. Entries might be null if
-   *               the coder could not be inferred.
+   * Given {code coders} are inferred from type arguments defined for this class. Coders which are
+   * already set should take precedence over offered coders.
+   *
+   * @param coders Array of coders indexed by the type arguments order. Entries might be null if the
+   *     coder could not be inferred.
    */
   void offerCoders(Coder[] coders);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index a057a0b..49d5722 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 
 /**
@@ -42,12 +41,12 @@ public class StateSpecs {
   private StateSpecs() {}
 
   /** Create a simple state spec for values of type {@code T}. */
-  public static <T> StateSpec<Object, ValueState<T>> value() {
+  public static <T> StateSpec<ValueState<T>> value() {
     return new ValueStateSpec<>(null);
   }
 
   /** Create a simple state spec for values of type {@code T}. */
-  public static <T> StateSpec<Object, ValueState<T>> value(Coder<T> valueCoder) {
+  public static <T> StateSpec<ValueState<T>> value(Coder<T> valueCoder) {
     checkArgument(valueCoder != null, "valueCoder should not be null. Consider value() instead");
     return new ValueStateSpec<>(valueCoder);
   }
@@ -57,17 +56,27 @@ public class StateSpecs {
    * {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-  StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+  StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
       CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn);
   }
 
   /**
+   * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge
+   * multiple {@code InputT}s into a single {@code OutputT}.
+   */
+  public static <InputT, AccumT, OutputT>
+      StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
+          CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+    return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(null, combineFn);
+  }
+
+  /**
    * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
    * {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+  StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
           Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     checkArgument(accumCoder != null,
         "accumCoder should not be null. "
@@ -80,11 +89,8 @@ public class StateSpecs {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+      StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
           Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
-    checkArgument(accumCoder != null,
-        "accumCoder should not be null. "
-            + "Consider using combining(CombineFn<> combineFn) instead.");
     return combiningInternal(accumCoder, combineFn);
   }
 
@@ -96,7 +102,7 @@ public class StateSpecs {
    * only be used to initialize static values.
    */
   public static <InputT, AccumT, OutputT>
-      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>>
+  StateSpec<CombiningState<InputT, AccumT, OutputT>>
   combiningFromInputInternal(
               Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     try {
@@ -113,13 +119,13 @@ public class StateSpecs {
   }
 
   private static <InputT, AccumT, OutputT>
-      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+  StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
           Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
 
   private static <InputT, AccumT, OutputT>
-  StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+  StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
       Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
     return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
@@ -128,7 +134,7 @@ public class StateSpecs {
    * Create a state spec that is optimized for adding values frequently, and occasionally retrieving
    * all the values that have been added.
    */
-  public static <T> StateSpec<Object, BagState<T>> bag() {
+  public static <T> StateSpec<BagState<T>> bag() {
     return bag(null);
   }
 
@@ -136,49 +142,46 @@ public class StateSpecs {
    * Create a state spec that is optimized for adding values frequently, and occasionally retrieving
    * all the values that have been added.
    */
-  public static <T> StateSpec<Object, BagState<T>> bag(Coder<T> elemCoder) {
+  public static <T> StateSpec<BagState<T>> bag(Coder<T> elemCoder) {
     return new BagStateSpec<>(elemCoder);
   }
 
   /**
    * Create a state spec that supporting for {@link java.util.Set} like access patterns.
    */
-  public static <T> StateSpec<Object, SetState<T>> set() {
+  public static <T> StateSpec<SetState<T>> set() {
     return set(null);
   }
 
   /**
    * Create a state spec that supporting for {@link java.util.Set} like access patterns.
    */
-  public static <T> StateSpec<Object, SetState<T>> set(Coder<T> elemCoder) {
+  public static <T> StateSpec<SetState<T>> set(Coder<T> elemCoder) {
     return new SetStateSpec<>(elemCoder);
   }
 
   /**
    * Create a state spec that supporting for {@link java.util.Map} like access patterns.
    */
-  public static <K, V> StateSpec<Object, MapState<K, V>> map() {
+  public static <K, V> StateSpec<MapState<K, V>> map() {
     return new MapStateSpec<>(null, null);
   }
 
-  /**
-   * Create a state spec that supporting for {@link java.util.Map} like access patterns.
-   */
-  public static <K, V> StateSpec<Object, MapState<K, V>> map(Coder<K> keyCoder,
-                                                             Coder<V> valueCoder) {
+  /** Create a state spec that supporting for {@link java.util.Map} like access patterns. */
+  public static <K, V> StateSpec<MapState<K, V>> map(Coder<K> keyCoder, Coder<V> valueCoder) {
     return new MapStateSpec<>(keyCoder, valueCoder);
   }
 
   /** Create a state spec for holding the watermark. */
-  public static <W extends BoundedWindow>
-      StateSpec<Object, WatermarkHoldState> watermarkStateInternal(
+  public static
+      StateSpec<WatermarkHoldState> watermarkStateInternal(
           TimestampCombiner timestampCombiner) {
-    return new WatermarkStateSpecInternal<W>(timestampCombiner);
+    return new WatermarkStateSpecInternal(timestampCombiner);
   }
 
-  public static <K, InputT, AccumT, OutputT>
-      StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
-          StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
+  public static <InputT, AccumT, OutputT>
+      StateSpec<BagState<AccumT>> convertToBagSpecInternal(
+          StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
     if (combiningSpec instanceof CombiningStateSpec) {
       // Checked above; conversion to a bag spec depends on the provided spec being one of those
       // created via the factory methods in this class.
@@ -201,7 +204,7 @@ public class StateSpecs {
    *
    * <p>Includes the coder for {@code T}.
    */
-  private static class ValueStateSpec<T> implements StateSpec<Object, ValueState<T>> {
+  private static class ValueStateSpec<T> implements StateSpec<ValueState<T>> {
 
     @Nullable
     private Coder<T> coder;
@@ -211,7 +214,7 @@ public class StateSpecs {
     }
 
     @Override
-    public ValueState<T> bind(String id, StateBinder<?> visitor) {
+    public ValueState<T> bind(String id, StateBinder visitor) {
       return visitor.bindValue(id, this, coder);
     }
 
@@ -260,7 +263,7 @@ public class StateSpecs {
    * <p>Includes the {@link CombineFn} and the coder for the accumulator type.
    */
   private static class CombiningStateSpec<InputT, AccumT, OutputT>
-      implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
+      implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
@@ -275,7 +278,7 @@ public class StateSpecs {
 
     @Override
     public CombiningState<InputT, AccumT, OutputT> bind(
-        String id, StateBinder<? extends Object> visitor) {
+        String id, StateBinder visitor) {
       return visitor.bindCombining(id, this, accumCoder, combineFn);
     }
 
@@ -320,7 +323,7 @@ public class StateSpecs {
       return Objects.hash(getClass(), accumCoder);
     }
 
-    private StateSpec<Object, BagState<AccumT>> asBagSpec() {
+    private StateSpec<BagState<AccumT>> asBagSpec() {
       return new BagStateSpec<AccumT>(accumCoder);
     }
   }
@@ -332,7 +335,7 @@ public class StateSpecs {
    * <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type.
    */
   private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT>
-      implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
+      implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable private Coder<AccumT> accumCoder;
     private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
@@ -346,7 +349,7 @@ public class StateSpecs {
 
     @Override
     public CombiningState<InputT, AccumT, OutputT> bind(
-        String id, StateBinder<? extends Object> visitor) {
+        String id, StateBinder visitor) {
       return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn);
     }
 
@@ -392,7 +395,7 @@ public class StateSpecs {
       return Objects.hash(getClass(), accumCoder);
     }
 
-    private StateSpec<Object, BagState<AccumT>> asBagSpec() {
+    private StateSpec<BagState<AccumT>> asBagSpec() {
       return new BagStateSpec<AccumT>(accumCoder);
     }
   }
@@ -403,7 +406,7 @@ public class StateSpecs {
    *
    * <p>Includes the coder for the element type {@code T}</p>
    */
-  private static class BagStateSpec<T> implements StateSpec<Object, BagState<T>> {
+  private static class BagStateSpec<T> implements StateSpec<BagState<T>> {
 
     @Nullable
     private Coder<T> elemCoder;
@@ -413,7 +416,7 @@ public class StateSpecs {
     }
 
     @Override
-    public BagState<T> bind(String id, StateBinder<?> visitor) {
+    public BagState<T> bind(String id, StateBinder visitor) {
       return visitor.bindBag(id, this, elemCoder);
     }
 
@@ -456,7 +459,7 @@ public class StateSpecs {
     }
   }
 
-  private static class MapStateSpec<K, V> implements StateSpec<Object, MapState<K, V>> {
+  private static class MapStateSpec<K, V> implements StateSpec<MapState<K, V>> {
 
     @Nullable
     private Coder<K> keyCoder;
@@ -469,7 +472,7 @@ public class StateSpecs {
     }
 
     @Override
-    public MapState<K, V> bind(String id, StateBinder<?> visitor) {
+    public MapState<K, V> bind(String id, StateBinder visitor) {
       return visitor.bindMap(id, this, keyCoder, valueCoder);
     }
 
@@ -523,7 +526,7 @@ public class StateSpecs {
    *
    * <p>Includes the coder for the element type {@code T}</p>
    */
-  private static class SetStateSpec<T> implements StateSpec<Object, SetState<T>> {
+  private static class SetStateSpec<T> implements StateSpec<SetState<T>> {
 
     @Nullable
     private Coder<T> elemCoder;
@@ -533,7 +536,7 @@ public class StateSpecs {
     }
 
     @Override
-    public SetState<T> bind(String id, StateBinder<?> visitor) {
+    public SetState<T> bind(String id, StateBinder visitor) {
       return visitor.bindSet(id, this, elemCoder);
     }
 
@@ -582,8 +585,7 @@ public class StateSpecs {
    * <p>Includes the {@link TimestampCombiner} according to which the output times
    * are combined.
    */
-  private static class WatermarkStateSpecInternal<W extends BoundedWindow>
-      implements StateSpec<Object, WatermarkHoldState> {
+  private static class WatermarkStateSpecInternal implements StateSpec<WatermarkHoldState> {
 
     /**
      * When multiple output times are added to hold the watermark, this determines how they are
@@ -597,7 +599,7 @@ public class StateSpecs {
     }
 
     @Override
-    public WatermarkHoldState bind(String id, StateBinder<?> visitor) {
+    public WatermarkHoldState bind(String id, StateBinder visitor) {
       return visitor.bindWatermark(id, this, timestampCombiner);
     }
 


[6/6] beam git commit: This closes #2627: Remove extraneous type variable from StateSpec

Posted by ke...@apache.org.
This closes #2627: Remove extraneous type variable from StateSpec

  Make DoFnSignatures robust to StateSpec subclasses
  Simplify type parameters of StateSpec and related


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b40b2650
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b40b2650
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b40b2650

Branch: refs/heads/master
Commit: b40b26501805fd6eabc1a5d9ffe60baa4a989bc7
Parents: eec903f 190422c
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 2 11:55:43 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 2 11:55:43 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            |   4 +-
 .../translation/utils/ApexStateInternals.java   |  42 +++--
 .../apex/translation/utils/NoOpStepContext.java |   2 +-
 .../translation/utils/StateInternalsProxy.java  |   6 +-
 .../utils/ApexStateInternalsTest.java           |  12 +-
 .../construction/PTransformMatchersTest.java    |   2 +-
 .../beam/runners/core/BaseExecutionContext.java |   2 +-
 .../beam/runners/core/ExecutionContext.java     |   2 +-
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   |   2 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   2 +-
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |   2 +-
 .../runners/core/InMemoryStateInternals.java    |  36 ++--
 .../runners/core/MergingActiveWindowSet.java    |   4 +-
 .../beam/runners/core/MergingStateAccessor.java |   2 +-
 .../apache/beam/runners/core/NonEmptyPanes.java |   2 +-
 .../beam/runners/core/PaneInfoTracker.java      |   2 +-
 .../runners/core/ReduceFnContextFactory.java    |  37 ++--
 .../beam/runners/core/ReduceFnRunner.java       |   4 +-
 .../beam/runners/core/SideInputHandler.java     |  14 +-
 .../beam/runners/core/SimpleDoFnRunner.java     |   8 +-
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   2 +-
 .../beam/runners/core/SplittableParDo.java      |   8 +-
 .../apache/beam/runners/core/StateAccessor.java |   2 +-
 .../beam/runners/core/StateInternals.java       |   8 +-
 .../runners/core/StateInternalsFactory.java     |   2 +-
 .../apache/beam/runners/core/StateMerging.java  |  16 +-
 .../apache/beam/runners/core/StateTable.java    |  10 +-
 .../org/apache/beam/runners/core/StateTag.java  |  28 ++-
 .../org/apache/beam/runners/core/StateTags.java |  70 ++++----
 .../beam/runners/core/StatefulDoFnRunner.java   |   6 +-
 .../beam/runners/core/SystemReduceFn.java       |   8 +-
 .../core/TestInMemoryStateInternals.java        |   6 +-
 .../apache/beam/runners/core/WatermarkHold.java |   8 +-
 .../beam/runners/core/WindowingInternals.java   |   2 +-
 .../AfterDelayFromFirstElementStateMachine.java |   2 +-
 .../core/triggers/AfterPaneStateMachine.java    |   2 +-
 .../TriggerStateMachineContextFactory.java      |  12 +-
 .../triggers/TriggerStateMachineRunner.java     |   2 +-
 .../core/GroupAlsoByWindowsProperties.java      |  10 +-
 .../core/InMemoryStateInternalsTest.java        |  16 +-
 .../core/MergingActiveWindowSetTest.java        |   2 +-
 .../beam/runners/core/ReduceFnTester.java       |  18 +-
 .../beam/runners/core/SplittableParDoTest.java  |   2 +-
 .../apache/beam/runners/core/StateTagTest.java  |  62 +++----
 .../runners/core/StatefulDoFnRunnerTest.java    |   4 +-
 .../CopyOnAccessInMemoryStateInternals.java     | 118 ++++++-------
 .../runners/direct/DirectExecutionContext.java  |  15 +-
 .../beam/runners/direct/EvaluationContext.java  |   8 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   6 +-
 .../beam/runners/direct/ParDoEvaluator.java     |   2 +-
 ...littableProcessElementsEvaluatorFactory.java |   2 +-
 .../direct/StatefulParDoEvaluatorFactory.java   |   5 +-
 .../runners/direct/StepTransformResult.java     |   4 +-
 .../beam/runners/direct/TransformResult.java    |   2 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java | 106 +++++------
 .../beam/runners/direct/DirectRunnerTest.java   |   1 +
 .../runners/direct/EvaluationContextTest.java   |  12 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |  10 +-
 .../functions/FlinkNoOpStepContext.java         |   2 +-
 .../functions/FlinkStatefulDoFnFunction.java    |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |   6 +-
 .../streaming/SplittableDoFnOperator.java       |   4 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   4 +-
 .../state/FlinkBroadcastStateInternals.java     |  45 +++--
 .../state/FlinkKeyGroupStateInternals.java      |  29 ++-
 .../state/FlinkSplitStateInternals.java         |  29 ++-
 .../streaming/state/FlinkStateInternals.java    |  48 ++---
 .../flink/streaming/DoFnOperatorTest.java       |   4 +-
 .../FlinkBroadcastStateInternalsTest.java       |   6 +-
 .../FlinkKeyGroupStateInternalsTest.java        |   2 +-
 .../streaming/FlinkSplitStateInternalsTest.java |   2 +-
 .../streaming/FlinkStateInternalsTest.java      |  12 +-
 .../BatchStatefulParDoOverridesTest.java        |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   2 +-
 .../spark/stateful/SparkStateInternals.java     |  44 +++--
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   2 +-
 .../spark/translation/SparkProcessContext.java  |   2 +-
 .../spark/translation/TranslationUtils.java     |   2 +-
 .../org/apache/beam/sdk/transforms/DoFn.java    |   8 +-
 .../beam/sdk/transforms/GroupIntoBatches.java   |   6 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |   2 +-
 .../sdk/transforms/reflect/DoFnSignatures.java  |  25 ++-
 .../apache/beam/sdk/util/state/StateBinder.java |  28 +--
 .../apache/beam/sdk/util/state/StateSpec.java   |  15 +-
 .../apache/beam/sdk/util/state/StateSpecs.java  |  92 +++++-----
 .../apache/beam/sdk/transforms/ParDoTest.java   | 177 +++++++++----------
 .../transforms/reflect/DoFnInvokersTest.java    |   2 +-
 .../transforms/reflect/DoFnSignaturesTest.java  |  26 +--
 .../beam/fn/harness/fake/FakeStepContext.java   |   2 +-
 89 files changed, 711 insertions(+), 707 deletions(-)
----------------------------------------------------------------------



[4/6] beam git commit: Simplify type parameters of StateSpec and related

Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
index 315110d..a056937 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
@@ -50,11 +50,11 @@ import org.joda.time.Instant;
 public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
 
   private final WindowFn<?, W> windowFn;
-  private StateInternals<?> stateInternals;
+  private StateInternals stateInternals;
   private final Coder<W> windowCoder;
 
-  public TriggerStateMachineContextFactory(WindowFn<?, W> windowFn,
-      StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) {
+  public TriggerStateMachineContextFactory(
+      WindowFn<?, W> windowFn, StateInternals stateInternals, ActiveWindowSet<W> activeWindows) {
     // Future triggers may be able to exploit the active window to state address window mapping.
     this.windowFn = windowFn;
     this.stateInternals = stateInternals;
@@ -263,7 +263,7 @@ public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
     }
 
     @Override
-    public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) {
+    public <StateT extends State> StateT access(StateTag<StateT> address) {
       return stateInternals.state(windowNamespace, address);
     }
   }
@@ -280,13 +280,13 @@ public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
 
     @Override
     public <StateT extends State> StateT access(
-        StateTag<? super Object, StateT> address) {
+        StateTag<StateT> address) {
       return stateInternals.state(windowNamespace, address);
     }
 
     @Override
     public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-        StateTag<? super Object, StateT> address) {
+        StateTag<StateT> address) {
       ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
       for (W mergingWindow : activeToBeMerged) {
         StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address);

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index e26241a..fc2f696 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -57,7 +57,7 @@ import org.joda.time.Instant;
  */
 public class TriggerStateMachineRunner<W extends BoundedWindow> {
   @VisibleForTesting
-  public static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
+  public static final StateTag<ValueState<BitSet>> FINISHED_BITS_TAG =
       StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
 
   private final ExecutableTriggerStateMachine rootTrigger;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index bc33366..054a2e2 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -570,7 +570,7 @@ public class GroupAlsoByWindowsProperties {
   }
 
   private static final class CachingStateInternalsFactory<K> implements StateInternalsFactory<K> {
-    private final LoadingCache<K, StateInternals<K>> stateInternalsCache;
+    private final LoadingCache<K, StateInternals> stateInternalsCache;
 
     private CachingStateInternalsFactory() {
       this.stateInternalsCache = CacheBuilder.newBuilder().build(new StateInternalsLoader<K>());
@@ -578,7 +578,7 @@ public class GroupAlsoByWindowsProperties {
 
     @Override
     @SuppressWarnings("unchecked")
-    public StateInternals<K> stateInternalsForKey(K key) {
+    public StateInternals stateInternalsForKey(K key) {
       try {
         return stateInternalsCache.get(key);
       } catch (Exception exc) {
@@ -587,9 +587,9 @@ public class GroupAlsoByWindowsProperties {
     }
   }
 
-  private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals<K>> {
+  private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals> {
     @Override
-    public StateInternals<K> load(K key) throws Exception {
+    public StateInternals load(K key) throws Exception {
       return InMemoryStateInternals.forKey(key);
     }
   }
@@ -686,7 +686,7 @@ public class GroupAlsoByWindowsProperties {
         }
 
         @Override
-        public StateInternals<?> stateInternals() {
+        public StateInternals stateInternals() {
           throw new UnsupportedOperationException();
         }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 6248401..16f7f26 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -60,22 +60,22 @@ public class InMemoryStateInternalsTest {
   private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
   private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
 
-  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+  private static final StateTag<CombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, SetState<String>> STRING_SET_ADDR =
+  private static final StateTag<SetState<String>> STRING_SET_ADDR =
       StateTags.set("stringSet", StringUtf8Coder.of());
-  private static final StateTag<Object, MapState<String, Integer>> STRING_MAP_ADDR =
+  private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR =
       StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
 
   InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey");

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
index 95d6977..7a83a18 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
@@ -45,7 +45,7 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class MergingActiveWindowSetTest {
   private Sessions windowFn;
-  private StateInternals<String> state;
+  private StateInternals state;
   private MergingActiveWindowSet<IntervalWindow> set;
   private ActiveWindowSet.MergeCallback<IntervalWindow> callback;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index eba0f67..573855f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -314,14 +314,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) {
     assertHasOnlyGlobalAndAllowedTags(
         ImmutableSet.copyOf(expectedWindows),
-        ImmutableSet.<StateTag<? super String, ?>>of(TriggerStateMachineRunner.FINISHED_BITS_TAG));
+        ImmutableSet.<StateTag<?>>of(TriggerStateMachineRunner.FINISHED_BITS_TAG));
   }
 
   @SafeVarargs
   public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expectedWindows) {
     assertHasOnlyGlobalAndAllowedTags(
         ImmutableSet.copyOf(expectedWindows),
-        ImmutableSet.<StateTag<? super String, ?>>of(
+        ImmutableSet.<StateTag<?>>of(
             TriggerStateMachineRunner.FINISHED_BITS_TAG,
             PaneInfoTracker.PANE_INFO_TAG,
             WatermarkHold.watermarkHoldTagForTimestampCombiner(
@@ -331,14 +331,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
   public final void assertHasOnlyGlobalState() {
     assertHasOnlyGlobalAndAllowedTags(
-        Collections.<W>emptySet(), Collections.<StateTag<? super String, ?>>emptySet());
+        Collections.<W>emptySet(), Collections.<StateTag<?>>emptySet());
   }
 
   @SafeVarargs
   public final void assertHasOnlyGlobalAndPaneInfoFor(W... expectedWindows) {
     assertHasOnlyGlobalAndAllowedTags(
         ImmutableSet.copyOf(expectedWindows),
-        ImmutableSet.<StateTag<? super String, ?>>of(
+        ImmutableSet.<StateTag<?>>of(
             PaneInfoTracker.PANE_INFO_TAG,
             WatermarkHold.watermarkHoldTagForTimestampCombiner(
                 objectStrategy.getTimestampCombiner()),
@@ -350,30 +350,30 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
    * {@code expectedWindows} and that each of these windows has only tags from {@code allowedTags}.
    */
   private void assertHasOnlyGlobalAndAllowedTags(
-      Set<W> expectedWindows, Set<StateTag<? super String, ?>> allowedTags) {
+      Set<W> expectedWindows, Set<StateTag<?>> allowedTags) {
     Set<StateNamespace> expectedWindowsSet = new HashSet<>();
     for (W expectedWindow : expectedWindows) {
       expectedWindowsSet.add(windowNamespace(expectedWindow));
     }
-    Map<StateNamespace, Set<StateTag<? super String, ?>>> actualWindows = new HashMap<>();
+    Map<StateNamespace, Set<StateTag<?>>> actualWindows = new HashMap<>();
 
     for (StateNamespace namespace : stateInternals.getNamespacesInUse()) {
       if (namespace instanceof StateNamespaces.GlobalNamespace) {
         continue;
       } else if (namespace instanceof StateNamespaces.WindowNamespace) {
-        Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace);
+        Set<StateTag<?>> tagsInUse = stateInternals.getTagsInUse(namespace);
         if (tagsInUse.isEmpty()) {
           continue;
         }
         actualWindows.put(namespace, tagsInUse);
-        Set<StateTag<? super String, ?>> unexpected = Sets.difference(tagsInUse, allowedTags);
+        Set<StateTag<?>> unexpected = Sets.difference(tagsInUse, allowedTags);
         if (unexpected.isEmpty()) {
           continue;
         } else {
           fail(namespace + " has unexpected states: " + tagsInUse);
         }
       } else if (namespace instanceof StateNamespaces.WindowAndTriggerNamespace) {
-        Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace);
+        Set<StateTag<?>> tagsInUse = stateInternals.getTagsInUse(namespace);
         assertTrue(namespace + " contains " + tagsInUse, tagsInUse.isEmpty());
       } else {
         fail("Unrecognized namespace " + namespace);

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 1a44453..a67db6d 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -231,7 +231,7 @@ public class SplittableParDoTest {
       processFn.setStateInternalsFactory(
           new StateInternalsFactory<String>() {
             @Override
-            public StateInternals<String> stateInternalsForKey(String key) {
+            public StateInternals stateInternalsForKey(String key) {
               return stateInternals;
             }
           });

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
index 9a8b75c..fc08dcc 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -41,10 +41,10 @@ import org.junit.runners.JUnit4;
 public class StateTagTest {
   @Test
   public void testValueEquality() {
-    StateTag<?, ?> fooVarInt1 = StateTags.value("foo", VarIntCoder.of());
-    StateTag<?, ?> fooVarInt2 = StateTags.value("foo", VarIntCoder.of());
-    StateTag<?, ?> fooBigEndian = StateTags.value("foo", BigEndianIntegerCoder.of());
-    StateTag<?, ?> barVarInt = StateTags.value("bar", VarIntCoder.of());
+    StateTag<?> fooVarInt1 = StateTags.value("foo", VarIntCoder.of());
+    StateTag<?> fooVarInt2 = StateTags.value("foo", VarIntCoder.of());
+    StateTag<?> fooBigEndian = StateTags.value("foo", BigEndianIntegerCoder.of());
+    StateTag<?> barVarInt = StateTags.value("bar", VarIntCoder.of());
 
     assertEquals(fooVarInt1, fooVarInt2);
     assertNotEquals(fooVarInt1, fooBigEndian);
@@ -53,10 +53,10 @@ public class StateTagTest {
 
   @Test
   public void testBagEquality() {
-    StateTag<?, ?> fooVarInt1 = StateTags.bag("foo", VarIntCoder.of());
-    StateTag<?, ?> fooVarInt2 = StateTags.bag("foo", VarIntCoder.of());
-    StateTag<?, ?> fooBigEndian = StateTags.bag("foo", BigEndianIntegerCoder.of());
-    StateTag<?, ?> barVarInt = StateTags.bag("bar", VarIntCoder.of());
+    StateTag<?> fooVarInt1 = StateTags.bag("foo", VarIntCoder.of());
+    StateTag<?> fooVarInt2 = StateTags.bag("foo", VarIntCoder.of());
+    StateTag<?> fooBigEndian = StateTags.bag("foo", BigEndianIntegerCoder.of());
+    StateTag<?> barVarInt = StateTags.bag("bar", VarIntCoder.of());
 
     assertEquals(fooVarInt1, fooVarInt2);
     assertNotEquals(fooVarInt1, fooBigEndian);
@@ -65,10 +65,10 @@ public class StateTagTest {
 
   @Test
   public void testSetEquality() {
-    StateTag<?, ?> fooVarInt1 = StateTags.set("foo", VarIntCoder.of());
-    StateTag<?, ?> fooVarInt2 = StateTags.set("foo", VarIntCoder.of());
-    StateTag<?, ?> fooBigEndian = StateTags.set("foo", BigEndianIntegerCoder.of());
-    StateTag<?, ?> barVarInt = StateTags.set("bar", VarIntCoder.of());
+    StateTag<?> fooVarInt1 = StateTags.set("foo", VarIntCoder.of());
+    StateTag<?> fooVarInt2 = StateTags.set("foo", VarIntCoder.of());
+    StateTag<?> fooBigEndian = StateTags.set("foo", BigEndianIntegerCoder.of());
+    StateTag<?> barVarInt = StateTags.set("bar", VarIntCoder.of());
 
     assertEquals(fooVarInt1, fooVarInt2);
     assertNotEquals(fooVarInt1, fooBigEndian);
@@ -77,15 +77,15 @@ public class StateTagTest {
 
   @Test
   public void testMapEquality() {
-    StateTag<?, ?> fooStringVarInt1 =
+    StateTag<?> fooStringVarInt1 =
         StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
-    StateTag<?, ?> fooStringVarInt2 =
+    StateTag<?> fooStringVarInt2 =
         StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
-    StateTag<?, ?> fooStringBigEndian =
+    StateTag<?> fooStringBigEndian =
         StateTags.map("foo", StringUtf8Coder.of(), BigEndianIntegerCoder.of());
-    StateTag<?, ?> fooVarIntBigEndian =
+    StateTag<?> fooVarIntBigEndian =
         StateTags.map("foo", VarIntCoder.of(), BigEndianIntegerCoder.of());
-    StateTag<?, ?> barStringVarInt =
+    StateTag<?> barStringVarInt =
         StateTags.map("bar", StringUtf8Coder.of(), VarIntCoder.of());
 
     assertEquals(fooStringVarInt1, fooStringVarInt2);
@@ -97,11 +97,11 @@ public class StateTagTest {
 
   @Test
   public void testWatermarkBagEquality() {
-    StateTag<?, ?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
-    StateTag<?, ?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
-    StateTag<?, ?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST);
+    StateTag<?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+    StateTag<?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+    StateTag<?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST);
 
-    StateTag<?, ?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST);
+    StateTag<?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST);
 
     // Same id, same fn.
     assertEquals(foo1, foo2);
@@ -119,12 +119,12 @@ public class StateTagTest {
     Coder<Integer> input2 = BigEndianIntegerCoder.of();
     Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers();
 
-    StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
-    StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
-    StateTag<?, ?> fooCoder1Min = StateTags.combiningValueFromInputInternal("foo", input1, minFn);
+    StateTag<?> fooCoder1Max1 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
+    StateTag<?> fooCoder1Max2 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
+    StateTag<?> fooCoder1Min = StateTags.combiningValueFromInputInternal("foo", input1, minFn);
 
-    StateTag<?, ?> fooCoder2Max = StateTags.combiningValueFromInputInternal("foo", input2, maxFn);
-    StateTag<?, ?> barCoder1Max = StateTags.combiningValueFromInputInternal("bar", input1, maxFn);
+    StateTag<?> fooCoder2Max = StateTags.combiningValueFromInputInternal("foo", input2, maxFn);
+    StateTag<?> barCoder1Max = StateTags.combiningValueFromInputInternal("bar", input1, maxFn);
 
     // Same name, coder and combineFn
     assertEquals(fooCoder1Max1, fooCoder1Max2);
@@ -162,16 +162,16 @@ public class StateTagTest {
     Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of());
     Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of());
 
-    StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueWithContext(
+    StateTag<?> fooCoder1Max1 = StateTags.combiningValueWithContext(
             "foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
-    StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueWithContext(
+    StateTag<?> fooCoder1Max2 = StateTags.combiningValueWithContext(
         "foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
-    StateTag<?, ?> fooCoder1Min = StateTags.combiningValueWithContext(
+    StateTag<?> fooCoder1Min = StateTags.combiningValueWithContext(
         "foo", accum1, CombineFnUtil.toFnWithContext(minFn));
 
-    StateTag<?, ?> fooCoder2Max = StateTags.combiningValueWithContext(
+    StateTag<?> fooCoder2Max = StateTags.combiningValueWithContext(
         "foo", accum2, CombineFnUtil.toFnWithContext(maxFn));
-    StateTag<?, ?> barCoder1Max = StateTags.combiningValueWithContext(
+    StateTag<?> barCoder1Max = StateTags.combiningValueWithContext(
         "bar", accum1, CombineFnUtil.toFnWithContext(maxFn));
 
     // Same name, coder and combineFn

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index aeaa63b..f80643a 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -131,7 +131,7 @@ public class StatefulDoFnRunnerTest {
     timerInternals.advanceInputWatermark(new Instant(1L));
 
     MyDoFn fn = new MyDoFn();
-    StateTag<Object, ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState);
+    StateTag<ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState);
 
     DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
         fn,
@@ -227,7 +227,7 @@ public class StatefulDoFnRunnerTest {
     public final String stateId = "foo";
 
     @StateId(stateId)
-    public final StateSpec<Object, ValueState<Integer>> intState =
+    public final StateSpec<ValueState<Integer>> intState =
         StateSpecs.value(VarIntCoder.of());
 
     @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index 92d87b5..ef3a053 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -60,24 +60,25 @@ import org.joda.time.Instant;
  * of {@link InMemoryState}. Whenever state that exists in the underlying {@link StateTable} is
  * accessed, an independent copy will be created within this table.
  */
-public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> {
-  private final K key;
-  private final CopyOnAccessInMemoryStateTable<K> table;
+public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals {
+  private final CopyOnAccessInMemoryStateTable table;
+
+  private K key;
 
   /**
    * Creates a new {@link CopyOnAccessInMemoryStateInternals} with the underlying (possibly null)
    * StateInternals.
    */
-  public static <K> CopyOnAccessInMemoryStateInternals<K> withUnderlying(
-      K key, @Nullable CopyOnAccessInMemoryStateInternals<K> underlying) {
-    return new CopyOnAccessInMemoryStateInternals<K>(key, underlying);
+  public static <K> CopyOnAccessInMemoryStateInternals withUnderlying(
+      K key, @Nullable CopyOnAccessInMemoryStateInternals underlying) {
+    return new CopyOnAccessInMemoryStateInternals<>(key, underlying);
   }
 
   private CopyOnAccessInMemoryStateInternals(
-      K key, CopyOnAccessInMemoryStateInternals<K> underlying) {
+      K key, CopyOnAccessInMemoryStateInternals underlying) {
     this.key = key;
     table =
-        new CopyOnAccessInMemoryStateTable<K>(key, underlying == null ? null : underlying.table);
+        new CopyOnAccessInMemoryStateTable(underlying == null ? null : underlying.table);
   }
 
   /**
@@ -94,7 +95,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
    *
    * @return this table
    */
-  public CopyOnAccessInMemoryStateInternals<K> commit() {
+  public CopyOnAccessInMemoryStateInternals commit() {
     table.commit();
     return this;
   }
@@ -116,18 +117,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
   }
 
   @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+  public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
     return state(namespace, address, StateContexts.nullContext());
   }
 
   @Override
   public <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
+      StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
     return table.get(namespace, address, c);
   }
 
   @Override
-  public K getKey() {
+  public Object getKey() {
     return key;
   }
 
@@ -140,9 +141,8 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
    * {@link StateTable#get(StateNamespace, StateTag, StateContext)}, first attempts to obtain a
    * copy of existing {@link State} from an underlying {@link StateTable}.
    */
-  private static class CopyOnAccessInMemoryStateTable<K> extends StateTable<K> {
-    private final K key;
-    private Optional<StateTable<K>> underlying;
+  private static class CopyOnAccessInMemoryStateTable extends StateTable {
+    private Optional<StateTable> underlying;
 
     /**
      * The StateBinderFactory currently in use by this {@link CopyOnAccessInMemoryStateTable}.
@@ -162,17 +162,16 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
      *       when a {@link StateTag} is bound.</li>
      * </ul>
      */
-    private StateBinderFactory<K> binderFactory;
+    private StateBinderFactory binderFactory;
 
     /**
      * The earliest watermark hold in this table.
      */
     private Optional<Instant> earliestWatermarkHold;
 
-    public CopyOnAccessInMemoryStateTable(K key, StateTable<K> underlying) {
-      this.key = key;
+    public CopyOnAccessInMemoryStateTable(StateTable underlying) {
       this.underlying = Optional.fromNullable(underlying);
-      binderFactory = new CopyOnBindBinderFactory<>(key, this.underlying);
+      binderFactory = new CopyOnBindBinderFactory(this.underlying);
       earliestWatermarkHold = Optional.absent();
     }
 
@@ -191,7 +190,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
     private void commit() {
       Instant earliestHold = getEarliestWatermarkHold();
       if (underlying.isPresent()) {
-        ReadThroughBinderFactory<K> readThroughBinder =
+        ReadThroughBinderFactory readThroughBinder =
             new ReadThroughBinderFactory<>(underlying.get());
         binderFactory = readThroughBinder;
         Instant earliestUnderlyingHold = readThroughBinder.readThroughAndGetEarliestHold(this);
@@ -201,7 +200,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
       }
       earliestWatermarkHold = Optional.of(earliestHold);
       clearEmpty();
-      binderFactory = new InMemoryStateBinderFactory<>(key);
+      binderFactory = new InMemoryStateBinderFactory();
       underlying = Optional.absent();
     }
 
@@ -246,37 +245,35 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
     }
 
     @Override
-    protected StateBinder<K> binderForNamespace(final StateNamespace namespace, StateContext<?> c) {
+    protected StateBinder binderForNamespace(final StateNamespace namespace, StateContext<?> c) {
       return binderFactory.forNamespace(namespace, c);
     }
 
-    private interface StateBinderFactory<K> {
-      StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c);
+    private interface StateBinderFactory {
+      StateBinder forNamespace(StateNamespace namespace, StateContext<?> c);
     }
 
     /**
      * {@link StateBinderFactory} that creates a copy of any existing state when the state is bound.
      */
-    private static class CopyOnBindBinderFactory<K> implements StateBinderFactory<K> {
-      private final K key;
-      private final Optional<StateTable<K>> underlying;
+    private static class CopyOnBindBinderFactory implements StateBinderFactory {
+      private final Optional<StateTable> underlying;
 
-      public CopyOnBindBinderFactory(K key, Optional<StateTable<K>> underlying) {
-        this.key = key;
+      public CopyOnBindBinderFactory(Optional<StateTable> underlying) {
         this.underlying = underlying;
       }
 
-      private boolean containedInUnderlying(StateNamespace namespace, StateTag<? super K, ?> tag) {
+      private boolean containedInUnderlying(StateNamespace namespace, StateTag<?> tag) {
         return underlying.isPresent() && underlying.get().isNamespaceInUse(namespace)
             && underlying.get().getTagsInUse(namespace).containsKey(tag);
       }
 
       @Override
-      public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
-        return new StateBinder<K>() {
+      public StateBinder forNamespace(final StateNamespace namespace, final StateContext<?> c) {
+        return new StateBinder() {
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
+          public WatermarkHoldState bindWatermark(
+              StateTag<WatermarkHoldState> address,
               TimestampCombiner timestampCombiner) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
@@ -291,7 +288,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
 
           @Override
           public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+              StateTag<ValueState<T>> address, Coder<T> coder) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
               InMemoryState<? extends ValueState<T>> existingState =
@@ -306,7 +303,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineFn<InputT, AccumT, OutputT> combineFn) {
             if (containedInUnderlying(namespace, address)) {
@@ -322,7 +319,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
 
           @Override
           public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+              StateTag<BagState<T>> address, Coder<T> elemCoder) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
               InMemoryState<? extends BagState<T>> existingState =
@@ -336,7 +333,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
 
           @Override
           public <T> SetState<T> bindSet(
-              StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+              StateTag<SetState<T>> address, Coder<T> elemCoder) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
               InMemoryState<? extends SetState<T>> existingState =
@@ -350,7 +347,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
 
           @Override
           public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<? super K, MapState<KeyT, ValueT>> address,
+              StateTag<MapState<KeyT, ValueT>> address,
               Coder<KeyT> mapKeyCoder,
               Coder<ValueT> mapValueCoder) {
             if (containedInUnderlying(namespace, address)) {
@@ -367,7 +364,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
             return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
@@ -381,17 +378,17 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
      * to {@link CopyOnAccessInMemoryStateTable#commit()} to read all values from
      * the underlying table.
      */
-    private static class ReadThroughBinderFactory<K> implements StateBinderFactory<K> {
-      private final StateTable<K> underlying;
+    private static class ReadThroughBinderFactory<K> implements StateBinderFactory {
+      private final StateTable underlying;
 
-      public ReadThroughBinderFactory(StateTable<K> underlying) {
+      public ReadThroughBinderFactory(StateTable underlying) {
         this.underlying = underlying;
       }
 
-      public Instant readThroughAndGetEarliestHold(StateTable<K> readTo) {
+      public Instant readThroughAndGetEarliestHold(StateTable readTo) {
         Instant earliestHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
         for (StateNamespace namespace : underlying.getNamespacesInUse()) {
-          for (Map.Entry<StateTag<? super K, ?>, ? extends State> existingState :
+          for (Map.Entry<StateTag<?>, ? extends State> existingState :
               underlying.getTagsInUse(namespace).entrySet()) {
             if (!((InMemoryState<?>) existingState.getValue()).isCleared()) {
               // Only read through non-cleared values to ensure that completed windows are
@@ -412,44 +409,44 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
       }
 
       @Override
-      public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
-        return new StateBinder<K>() {
+      public StateBinder forNamespace(final StateNamespace namespace, final StateContext<?> c) {
+        return new StateBinder() {
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
+          public WatermarkHoldState bindWatermark(
+              StateTag<WatermarkHoldState> address,
               TimestampCombiner timestampCombiner) {
             return underlying.get(namespace, address, c);
           }
 
           @Override
           public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+              StateTag<ValueState<T>> address, Coder<T> coder) {
             return underlying.get(namespace, address, c);
           }
 
           @Override
           public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
               bindCombiningValue(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
             return underlying.get(namespace, address, c);
           }
 
           @Override
           public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+              StateTag<BagState<T>> address, Coder<T> elemCoder) {
             return underlying.get(namespace, address, c);
           }
 
           @Override
           public <T> SetState<T> bindSet(
-              StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+              StateTag<SetState<T>> address, Coder<T> elemCoder) {
             return underlying.get(namespace, address, c);
           }
 
           @Override
           public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<? super K, MapState<KeyT, ValueT>> address,
+              StateTag<MapState<KeyT, ValueT>> address,
               Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
             return underlying.get(namespace, address, c);
           }
@@ -457,7 +454,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
           @Override
           public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
           bindCombiningValueWithContext(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
             return bindCombiningValue(
@@ -467,16 +464,13 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
       }
     }
 
-    private static class InMemoryStateBinderFactory<K> implements StateBinderFactory<K> {
-      private final K key;
+    private static class InMemoryStateBinderFactory implements StateBinderFactory {
 
-      public InMemoryStateBinderFactory(K key) {
-        this.key = key;
-      }
+      public InMemoryStateBinderFactory() {}
 
       @Override
-      public StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c) {
-        return new InMemoryStateBinder<>(key, c);
+      public StateBinder forNamespace(StateNamespace namespace, StateContext<?> c) {
+        return new InMemoryStateBinder(c);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 1108f0d..107f39a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -34,11 +34,14 @@ class DirectExecutionContext
     extends BaseExecutionContext<DirectStepContext> {
   private final Clock clock;
   private final StructuralKey<?> key;
-  private final CopyOnAccessInMemoryStateInternals<Object> existingState;
+  private final CopyOnAccessInMemoryStateInternals existingState;
   private final TransformWatermarks watermarks;
 
-  public DirectExecutionContext(Clock clock, StructuralKey<?> key,
-      CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) {
+  public DirectExecutionContext(
+      Clock clock,
+      StructuralKey<?> key,
+      CopyOnAccessInMemoryStateInternals existingState,
+      TransformWatermarks watermarks) {
     this.clock = clock;
     this.key = key;
     this.existingState = existingState;
@@ -55,7 +58,7 @@ class DirectExecutionContext
    */
   public class DirectStepContext
       extends BaseExecutionContext.StepContext {
-    private CopyOnAccessInMemoryStateInternals<Object> stateInternals;
+    private CopyOnAccessInMemoryStateInternals<?> stateInternals;
     private DirectTimerInternals timerInternals;
 
     public DirectStepContext(
@@ -64,7 +67,7 @@ class DirectExecutionContext
     }
 
     @Override
-    public CopyOnAccessInMemoryStateInternals<Object> stateInternals() {
+    public CopyOnAccessInMemoryStateInternals<?> stateInternals() {
       if (stateInternals == null) {
         stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState);
       }
@@ -84,7 +87,7 @@ class DirectExecutionContext
      * Commits the state of this step, and returns the committed state. If the step has not
      * accessed any state, return null.
      */
-    public CopyOnAccessInMemoryStateInternals<?> commitState() {
+    public CopyOnAccessInMemoryStateInternals commitState() {
       if (stateInternals != null) {
         return stateInternals.commit();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 54ce027..f6d9a36 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -90,7 +90,7 @@ class EvaluationContext {
   private final WatermarkCallbackExecutor callbackExecutor;
 
   /** The stateInternals of the world, by applied PTransform and key. */
-  private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>>
+  private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals>
       applicationStateInternals;
 
   private final SideInputContainer sideInputContainer;
@@ -179,9 +179,9 @@ class EvaluationContext {
       result.getAggregatorChanges().commit();
     }
     // Update state internals
-    CopyOnAccessInMemoryStateInternals<?> theirState = result.getState();
+    CopyOnAccessInMemoryStateInternals theirState = result.getState();
     if (theirState != null) {
-      CopyOnAccessInMemoryStateInternals<?> committedState = theirState.commit();
+      CopyOnAccessInMemoryStateInternals committedState = theirState.commit();
       StepAndKey stepAndKey =
           StepAndKey.of(
               result.getTransform(), completedBundle == null ? null : completedBundle.getKey());
@@ -331,7 +331,7 @@ class EvaluationContext {
     return new DirectExecutionContext(
         clock,
         key,
-        (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
+        (CopyOnAccessInMemoryStateInternals) applicationStateInternals.get(stepAndKey),
         watermarkManager.getWatermarks(application));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index f1e29c6..9f567a4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -164,8 +164,8 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
               (PCollection<KV<K, Iterable<V>>>)
                   Iterables.getOnlyElement(application.getOutputs().values()));
       outputBundles.add(bundle);
-      CopyOnAccessInMemoryStateInternals<K> stateInternals =
-          (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();
+      CopyOnAccessInMemoryStateInternals stateInternals =
+          (CopyOnAccessInMemoryStateInternals) stepContext.stateInternals();
       DirectTimerInternals timerInternals = stepContext.timerInternals();
       ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
           new ReduceFnRunner<>(
@@ -191,7 +191,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
     @Override
     public TransformResult<KeyedWorkItem<K, V>> finishBundle() throws Exception {
       // State is initialized within the constructor. It can never be null.
-      CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+      CopyOnAccessInMemoryStateInternals state = stepContext.commitState();
       return StepTransformResult.<KeyedWorkItem<K, V>>withHold(
               application, state.getEarliestWatermarkHold())
           .withState(state)

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index cab11db..053da31 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -211,7 +211,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
       throw UserCodeException.wrap(e);
     }
     StepTransformResult.Builder<InputT> resultBuilder;
-    CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+    CopyOnAccessInMemoryStateInternals state = stepContext.commitState();
     if (state != null) {
       resultBuilder =
           StepTransformResult.<InputT>withHold(transform, state.getEarliestWatermarkHold())

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 7efdb52..e0adc40 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -127,7 +127,7 @@ class SplittableProcessElementsEvaluatorFactory<
         new StateInternalsFactory<String>() {
           @SuppressWarnings({"unchecked", "rawtypes"})
           @Override
-          public StateInternals<String> stateInternalsForKey(String key) {
+          public StateInternals stateInternalsForKey(String key) {
             return (StateInternals) stepContext.stateInternals();
           }
         });

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 8793ae8..93ab077 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -175,10 +175,11 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
             @Override
             public void run() {
               for (StateDeclaration stateDecl : signature.stateDeclarations().values()) {
-                StateTag<Object, ?> tag;
+                StateTag<?> tag;
                 try {
                   tag =
-                      StateTags.tagForSpec(stateDecl.id(), (StateSpec) stateDecl.field().get(doFn));
+                      StateTags.tagForSpec(
+                          stateDecl.id(), (StateSpec) stateDecl.field().get(doFn));
                 } catch (IllegalAccessException e) {
                   throw new RuntimeException(
                       String.format(

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 01b2a72..fe3ae97 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -70,7 +70,7 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
     private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
     private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElementsBuilder;
     private MetricUpdates metricUpdates;
-    private CopyOnAccessInMemoryStateInternals<?> state;
+    private CopyOnAccessInMemoryStateInternals state;
     private TimerUpdate timerUpdate;
     private AggregatorContainer.Mutator aggregatorChanges;
     private final Set<OutputType> producedOutputs;
@@ -109,7 +109,7 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
       return this;
     }
 
-    public Builder<InputT> withState(CopyOnAccessInMemoryStateInternals<?> state) {
+    public Builder<InputT> withState(CopyOnAccessInMemoryStateInternals state) {
       this.state = state;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index 8bb5f93..bde44ca 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -85,7 +85,7 @@ public interface TransformResult<InputT> {
    * <p>If this evaluation did not access state, this may return null.
    */
   @Nullable
-  CopyOnAccessInMemoryStateInternals<?> getState();
+  CopyOnAccessInMemoryStateInternals getState();
 
   /**
    * Returns a TimerUpdateBuilder that was produced as a result of this evaluation. If the

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 4d04745..3e29a69 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -72,7 +72,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = internals.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -92,7 +92,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = internals.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -114,18 +114,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
    */
   @Test
   public void testGetWithPresentInUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, ValueState<String>> valueTag = StateTags.value("foo", StringUtf8Coder.of());
+    StateTag<ValueState<String>> valueTag = StateTags.value("foo", StringUtf8Coder.of());
     ValueState<String> underlyingValue = underlying.state(namespace, valueTag);
     assertThat(underlyingValue.read(), nullValue(String.class));
 
     underlyingValue.write("bar");
     assertThat(underlyingValue.read(), equalTo("bar"));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     ValueState<String> copyOnAccessState = internals.state(namespace, valueTag);
     assertThat(copyOnAccessState.read(), equalTo("bar"));
@@ -140,18 +140,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testBagStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<Integer>> valueTag = StateTags.bag("foo", VarIntCoder.of());
+    StateTag<BagState<Integer>> valueTag = StateTags.bag("foo", VarIntCoder.of());
     BagState<Integer> underlyingValue = underlying.state(namespace, valueTag);
     assertThat(underlyingValue.read(), emptyIterable());
 
     underlyingValue.add(1);
     assertThat(underlyingValue.read(), containsInAnyOrder(1));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     BagState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
     assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
@@ -166,18 +166,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testSetStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, SetState<Integer>> valueTag = StateTags.set("foo", VarIntCoder.of());
+    StateTag<SetState<Integer>> valueTag = StateTags.set("foo", VarIntCoder.of());
     SetState<Integer> underlyingValue = underlying.state(namespace, valueTag);
     assertThat(underlyingValue.read(), emptyIterable());
 
     underlyingValue.add(1);
     assertThat(underlyingValue.read(), containsInAnyOrder(1));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     SetState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
     assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
@@ -192,11 +192,11 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testMapStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, MapState<String, Integer>> valueTag =
+    StateTag<MapState<String, Integer>> valueTag =
         StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
     MapState<String, Integer> underlyingValue = underlying.state(namespace, valueTag);
     assertThat(underlyingValue.entries().read(), emptyIterable());
@@ -204,7 +204,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     underlyingValue.put("hello", 1);
     assertThat(underlyingValue.get("hello").read(), equalTo(1));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     MapState<String, Integer> copyOnAccessState = internals.state(namespace, valueTag);
     assertThat(copyOnAccessState.get("hello").read(), equalTo(1));
@@ -221,13 +221,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
     CombineFn<Long, long[], Long> sumLongFn = Sum.ofLongs();
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
     CoderRegistry reg = pipeline.getCoderRegistry();
-    StateTag<Object, CombiningState<Long, long[], Long>> stateTag =
+    StateTag<CombiningState<Long, long[], Long>> stateTag =
         StateTags.combiningValue("summer",
             sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
     GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
@@ -236,7 +236,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     underlyingValue.add(1L);
     assertThat(underlyingValue.read(), equalTo(1L));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
     assertThat(copyOnAccessState.read(), equalTo(1L));
@@ -251,13 +251,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testWatermarkHoldStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST;
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, WatermarkHoldState> stateTag =
+    StateTag<WatermarkHoldState> stateTag =
         StateTags.watermarkStateInternal("wmstate", timestampCombiner);
     WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), nullValue());
@@ -265,7 +265,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     underlyingValue.add(new Instant(250L));
     assertThat(underlyingValue.read(), equalTo(new Instant(250L)));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag);
     assertThat(copyOnAccessState.read(), equalTo(new Instant(250L)));
@@ -284,10 +284,10 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithoutUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = internals.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -304,13 +304,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = underlying.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -331,15 +331,15 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithClearedInUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> secondUnderlying =
+    CopyOnAccessInMemoryStateInternals<String>secondUnderlying =
         spy(CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying));
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, secondUnderlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = underlying.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -361,13 +361,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithOverwrittenUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = underlying.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -392,15 +392,15 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithAddedUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     internals.commit();
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = underlying.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -416,7 +416,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithEmptyTableIsEmpty() {
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     internals.commit();
@@ -426,11 +426,11 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithOnlyClearedValuesIsEmpty() {
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = internals.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -444,13 +444,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithEmptyNewAndFullUnderlyingIsNotEmpty() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = underlying.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -475,16 +475,16 @@ public class CopyOnAccessInMemoryStateInternalsTest {
         return new Instant(689743L);
       }
     };
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
 
-    StateTag<Object, WatermarkHoldState> firstHoldAddress =
+    StateTag<WatermarkHoldState> firstHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
     WatermarkHoldState firstHold =
         internals.state(StateNamespaces.window(null, first), firstHoldAddress);
     firstHold.add(new Instant(22L));
 
-    StateTag<Object, WatermarkHoldState> secondHoldAddress =
+    StateTag<WatermarkHoldState> secondHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
     WatermarkHoldState secondHold =
         internals.state(StateNamespaces.window(null, second), secondHoldAddress);
@@ -508,18 +508,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
         return new Instant(689743L);
       }
     };
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
-    StateTag<Object, WatermarkHoldState> firstHoldAddress =
+    StateTag<WatermarkHoldState> firstHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
     WatermarkHoldState firstHold =
         underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
     firstHold.add(new Instant(22L));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
 
-    StateTag<Object, WatermarkHoldState> secondHoldAddress =
+    StateTag<WatermarkHoldState> secondHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
     WatermarkHoldState secondHold =
         internals.state(StateNamespaces.window(null, second), secondHoldAddress);
@@ -545,18 +545,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
             return new Instant(689743L);
           }
         };
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
-    StateTag<Object, WatermarkHoldState> firstHoldAddress =
+    StateTag<WatermarkHoldState> firstHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
     WatermarkHoldState firstHold =
         underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
     firstHold.add(new Instant(224L));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
 
-    StateTag<Object, WatermarkHoldState> secondHoldAddress =
+    StateTag<WatermarkHoldState> secondHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
     WatermarkHoldState secondHold =
         internals.state(StateNamespaces.window(null, second), secondHoldAddress);
@@ -568,7 +568,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testGetEarliestHoldBeforeCommit() {
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     internals

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 51ae12a..6f9adc4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -524,6 +524,7 @@ public class DirectRunnerTest implements Serializable {
   }
 
   private static class LongNoDecodeCoder extends CustomCoder<Long> {
+
     @Override
     public void encode(
         Long value, OutputStream outStream, Context context) throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 0c3a8ed..bfbcd79 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -161,7 +161,7 @@ public class EvaluationContextTest {
         context.getExecutionContext(createdProducer,
             StructuralKey.of("foo", StringUtf8Coder.of()));
 
-    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+    StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     DirectStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1");
     stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
@@ -194,7 +194,7 @@ public class EvaluationContextTest {
         context.getExecutionContext(createdProducer,
             StructuralKey.of("foo", StringUtf8Coder.of()));
 
-    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+    StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     fooContext
         .getOrCreateStepContext("s1", "s1")
@@ -221,7 +221,7 @@ public class EvaluationContextTest {
     DirectExecutionContext fooContext =
         context.getExecutionContext(createdProducer, myKey);
 
-    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+    StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     fooContext
         .getOrCreateStepContext("s1", "s1")
@@ -246,9 +246,9 @@ public class EvaluationContextTest {
     DirectExecutionContext fooContext =
         context.getExecutionContext(downstreamProducer, myKey);
 
-    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+    StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
-    CopyOnAccessInMemoryStateInternals<Object> state =
+    CopyOnAccessInMemoryStateInternals<?> state =
         fooContext.getOrCreateStepContext("s1", "s1").stateInternals();
     BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
     bag.add(1);
@@ -268,7 +268,7 @@ public class EvaluationContextTest {
     DirectExecutionContext afterResultContext =
         context.getExecutionContext(downstreamProducer, myKey);
 
-    CopyOnAccessInMemoryStateInternals<Object> afterResultState =
+    CopyOnAccessInMemoryStateInternals<?> afterResultState =
         afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals();
     assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index ecb8130..fc63406 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -92,7 +92,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
   @Mock private transient UncommittedBundle<Integer> mockUncommittedBundle;
 
   private static final String KEY = "any-key";
-  private transient StateInternals<Object> stateInternals =
+  private transient StateInternals stateInternals =
       CopyOnAccessInMemoryStateInternals.<Object>withUnderlying(KEY, null);
 
   private static final BundleFactory BUNDLE_FACTORY = ImmutableListBundleFactory.create();
@@ -104,7 +104,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
-    when((StateInternals<Object>) mockStepContext.stateInternals()).thenReturn(stateInternals);
+    when((StateInternals) mockStepContext.stateInternals()).thenReturn(stateInternals);
     when(mockEvaluationContext.createSideInputReader(anyList()))
         .thenReturn(
             SideInputContainer.create(
@@ -133,7 +133,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
                     ParDo.of(
                             new DoFn<KV<String, Integer>, Integer>() {
                               @StateId(stateId)
-                              private final StateSpec<Object, ValueState<String>> spec =
+                              private final StateSpec<ValueState<String>> spec =
                                   StateSpecs.value(StringUtf8Coder.of());
 
                               @ProcessElement
@@ -165,7 +165,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
         StateNamespaces.window(IntervalWindow.getCoder(), firstWindow);
     StateNamespace secondWindowNamespace =
         StateNamespaces.window(IntervalWindow.getCoder(), secondWindow);
-    StateTag<Object, ValueState<String>> tag =
+    StateTag<ValueState<String>> tag =
         StateTags.tagForSpec(stateId, StateSpecs.value(StringUtf8Coder.of()));
 
     // Set up non-empty state. We don't mock + verify calls to clear() but instead
@@ -247,7 +247,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
                         .of(
                             new DoFn<KV<String, Integer>, Integer>() {
                               @StateId(stateId)
-                              private final StateSpec<Object, ValueState<String>> spec =
+                              private final StateSpec<ValueState<String>> spec =
                                   StateSpecs.value(StringUtf8Coder.of());
 
                               @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
index 847a00a..8640801 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
@@ -61,7 +61,7 @@ public class FlinkNoOpStepContext implements StepContext {
   }
 
   @Override
-  public StateInternals<?> stateInternals() {
+  public StateInternals stateInternals() {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 879fad7..a79f856 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -123,7 +123,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
         Collections.<TupleTag<?>>emptyList(),
         new FlinkNoOpStepContext() {
           @Override
-          public StateInternals<?> stateInternals() {
+          public StateInternals stateInternals() {
             return stateInternals;
           }
           @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 01830de..d8fd79a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -133,7 +133,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected transient long currentOutputWatermark;
 
-  private transient StateTag<Object, BagState<WindowedValue<InputT>>> pushedBackTag;
+  private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
 
   protected transient FlinkStateInternals<?> stateInternals;
 
@@ -149,7 +149,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected transient FlinkTimerInternals timerInternals;
 
-  private transient StateInternals<?> pushbackStateInternals;
+  private transient StateInternals pushbackStateInternals;
 
   private transient Optional<Long> pushedBackWatermark;
 
@@ -673,7 +673,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     }
 
     @Override
-    public StateInternals<?> stateInternals() {
+    public StateInternals stateInternals() {
       return stateInternals;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index fb6762d..1887a99 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -94,10 +94,10 @@ public class SplittableDoFnOperator<
 
     StateInternalsFactory<String> stateInternalsFactory = new StateInternalsFactory<String>() {
       @Override
-      public StateInternals<String> stateInternalsForKey(String key) {
+      public StateInternals stateInternalsForKey(String key) {
         //this will implicitly be keyed by the key of the incoming
         // element or by the key of a firing timer
-        return (StateInternals<String>) stateInternals;
+        return (StateInternals) stateInternals;
       }
     };
     TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() {

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 9718734..3899303 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -83,10 +83,10 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
     StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() {
       @Override
-      public StateInternals<K> stateInternalsForKey(K key) {
+      public StateInternals stateInternalsForKey(K key) {
         //this will implicitly be keyed by the key of the incoming
         // element or by the key of a firing timer
-        return (StateInternals<K>) stateInternals;
+        return (StateInternals) stateInternals;
       }
     };
     TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() {


[2/6] beam git commit: Simplify type parameters of StateSpec and related

Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 52b2f5e..26904aa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1625,7 +1625,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Integer>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<Integer>> intState =
+          private final StateSpec<ValueState<Integer>> intState =
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
@@ -1654,7 +1654,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<Integer, Integer>, Integer>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<Integer>> seenSpec =
+          private final StateSpec<ValueState<Integer>> seenSpec =
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
@@ -1704,7 +1704,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, MyInteger>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<MyInteger>> intState =
+          private final StateSpec<ValueState<MyInteger>> intState =
               StateSpecs.value();
 
           @ProcessElement
@@ -1734,7 +1734,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, MyInteger>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<MyInteger>> intState =
+          private final StateSpec<ValueState<MyInteger>> intState =
               StateSpecs.value();
 
           @ProcessElement
@@ -1765,7 +1765,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, MyInteger>, MyInteger>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<MyInteger>> intState =
+          private final StateSpec<ValueState<MyInteger>> intState =
               StateSpecs.value();
 
           @ProcessElement
@@ -1797,7 +1797,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, List<MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<List<MyInteger>>> intState =
+          private final StateSpec<ValueState<List<MyInteger>>> intState =
               StateSpecs.value();
 
           @ProcessElement
@@ -1828,7 +1828,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Integer>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<Integer>> intState =
+          private final StateSpec<ValueState<Integer>> intState =
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
@@ -1876,7 +1876,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, KV<String, Integer>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<Integer>> intState =
+          private final StateSpec<ValueState<Integer>> intState =
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
@@ -1892,7 +1892,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Integer>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<Integer>> intState =
+          private final StateSpec<ValueState<Integer>> intState =
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
@@ -1929,7 +1929,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Integer>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<Integer>> intState =
+          private final StateSpec<ValueState<Integer>> intState =
               StateSpecs.value(VarIntCoder.of());
 
           @ProcessElement
@@ -1976,7 +1976,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, List<Integer>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, BagState<Integer>> bufferState =
+          private final StateSpec<BagState<Integer>> bufferState =
               StateSpecs.bag(VarIntCoder.of());
 
           @ProcessElement
@@ -2013,7 +2013,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, List<MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, BagState<MyInteger>> bufferState =
+          private final StateSpec<BagState<MyInteger>> bufferState =
               StateSpecs.bag();
 
           @ProcessElement
@@ -2051,7 +2051,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, List<MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, BagState<MyInteger>> bufferState =
+          private final StateSpec<BagState<MyInteger>> bufferState =
               StateSpecs.bag();
 
           @ProcessElement
@@ -2088,10 +2088,10 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Set<Integer>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, SetState<Integer>> setState =
+          private final StateSpec<SetState<Integer>> setState =
               StateSpecs.set(VarIntCoder.of());
           @StateId(countStateId)
-          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+          private final StateSpec<CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2132,10 +2132,10 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Set<MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
+          private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set();
 
           @StateId(countStateId)
-          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+          private final StateSpec<CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2175,10 +2175,10 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, Set<MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
+          private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set();
 
           @StateId(countStateId)
-          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+          private final StateSpec<CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2217,10 +2217,10 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, MapState<String, Integer>> mapState =
+          private final StateSpec<MapState<String, Integer>> mapState =
               StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
           @StateId(countStateId)
-          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+          private final StateSpec<CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2264,9 +2264,10 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
+          private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
+
           @StateId(countStateId)
-          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+          private final StateSpec<CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2310,9 +2311,10 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
+          private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
+
           @StateId(countStateId)
-          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+          private final StateSpec<CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2356,16 +2358,13 @@ public class ParDoTest implements Serializable {
           private static final double EPSILON = 0.0001;
 
           @StateId(stateId)
-          private final StateSpec<
-                  Object, CombiningState<Double, CountSum<Double>, Double>>
-              combiningState =
-                  StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
+          private final StateSpec<CombiningState<Double, CountSum<Double>, Double>> combiningState =
+              StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
 
           @ProcessElement
           public void processElement(
               ProcessContext c,
-              @StateId(stateId)
-                  CombiningState<Double, CountSum<Double>, Double> state) {
+              @StateId(stateId) CombiningState<Double, CountSum<Double>, Double> state) {
             state.add(c.element().getValue());
             Double currentValue = state.read();
             if (Math.abs(currentValue - 0.5) < EPSILON) {
@@ -2396,40 +2395,38 @@ public class ParDoTest implements Serializable {
           private static final int EXPECTED_SUM = 16;
 
           @StateId(stateId)
-          private final StateSpec<
-              Object, CombiningState<Integer, MyInteger, Integer>>
-              combiningState =
-              StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
-                @Override
-                public MyInteger createAccumulator() {
-                  return new MyInteger(0);
-                }
-
-                @Override
-                public MyInteger addInput(MyInteger accumulator, Integer input) {
-                  return new MyInteger(accumulator.getValue() + input);
-                }
-
-                @Override
-                public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
-                  int newValue = 0;
-                  for (MyInteger myInteger : accumulators) {
-                    newValue += myInteger.getValue();
-                  }
-                  return new MyInteger(newValue);
-                }
+          private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState =
+              StateSpecs.combining(
+                  new Combine.CombineFn<Integer, MyInteger, Integer>() {
+                    @Override
+                    public MyInteger createAccumulator() {
+                      return new MyInteger(0);
+                    }
+
+                    @Override
+                    public MyInteger addInput(MyInteger accumulator, Integer input) {
+                      return new MyInteger(accumulator.getValue() + input);
+                    }
+
+                    @Override
+                    public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
+                      int newValue = 0;
+                      for (MyInteger myInteger : accumulators) {
+                        newValue += myInteger.getValue();
+                      }
+                      return new MyInteger(newValue);
+                    }
 
-                @Override
-                public Integer extractOutput(MyInteger accumulator) {
-                  return accumulator.getValue();
-                }
-              });
+                    @Override
+                    public Integer extractOutput(MyInteger accumulator) {
+                      return accumulator.getValue();
+                    }
+                  });
 
           @ProcessElement
           public void processElement(
               ProcessContext c,
-              @StateId(stateId)
-                  CombiningState<Integer, MyInteger, Integer> state) {
+              @StateId(stateId) CombiningState<Integer, MyInteger, Integer> state) {
             state.add(c.element().getValue());
             Integer currentValue = state.read();
             if (currentValue == EXPECTED_SUM) {
@@ -2458,40 +2455,38 @@ public class ParDoTest implements Serializable {
           private static final int EXPECTED_SUM = 16;
 
           @StateId(stateId)
-          private final StateSpec<
-              Object, CombiningState<Integer, MyInteger, Integer>>
-              combiningState =
-              StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
-                @Override
-                public MyInteger createAccumulator() {
-                  return new MyInteger(0);
-                }
-
-                @Override
-                public MyInteger addInput(MyInteger accumulator, Integer input) {
-                  return new MyInteger(accumulator.getValue() + input);
-                }
-
-                @Override
-                public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
-                  int newValue = 0;
-                  for (MyInteger myInteger : accumulators) {
-                    newValue += myInteger.getValue();
-                  }
-                  return new MyInteger(newValue);
-                }
+          private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState =
+              StateSpecs.combining(
+                  new Combine.CombineFn<Integer, MyInteger, Integer>() {
+                    @Override
+                    public MyInteger createAccumulator() {
+                      return new MyInteger(0);
+                    }
+
+                    @Override
+                    public MyInteger addInput(MyInteger accumulator, Integer input) {
+                      return new MyInteger(accumulator.getValue() + input);
+                    }
+
+                    @Override
+                    public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
+                      int newValue = 0;
+                      for (MyInteger myInteger : accumulators) {
+                        newValue += myInteger.getValue();
+                      }
+                      return new MyInteger(newValue);
+                    }
 
-                @Override
-                public Integer extractOutput(MyInteger accumulator) {
-                  return accumulator.getValue();
-                }
-              });
+                    @Override
+                    public Integer extractOutput(MyInteger accumulator) {
+                      return accumulator.getValue();
+                    }
+                  });
 
           @ProcessElement
           public void processElement(
               ProcessContext c,
-              @StateId(stateId)
-                  CombiningState<Integer, MyInteger, Integer> state) {
+              @StateId(stateId) CombiningState<Integer, MyInteger, Integer> state) {
             state.add(c.element().getValue());
             Integer currentValue = state.read();
             if (currentValue == EXPECTED_SUM) {
@@ -2523,7 +2518,7 @@ public class ParDoTest implements Serializable {
         new DoFn<KV<String, Integer>, List<Integer>>() {
 
           @StateId(stateId)
-          private final StateSpec<Object, BagState<Integer>> bufferState =
+          private final StateSpec<BagState<Integer>> bufferState =
               StateSpecs.bag(VarIntCoder.of());
 
           @ProcessElement
@@ -2697,7 +2692,7 @@ public class ParDoTest implements Serializable {
           private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<String>> stateSpec =
+          private final StateSpec<ValueState<String>> stateSpec =
               StateSpecs.value(StringUtf8Coder.of());
 
           @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 5732438..c16eea2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -188,7 +188,7 @@ public class DoFnInvokersTest {
 
     class MockFn extends DoFn<String, String> {
       @StateId(stateId)
-      private final StateSpec<Object, ValueState<Integer>> spec =
+      private final StateSpec<ValueState<Integer>> spec =
           StateSpecs.value(VarIntCoder.of());
 
       @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index e1fa2d1..d6cc4f6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -542,11 +542,11 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("my-id")
-              private final StateSpec<Object, ValueState<Integer>> myfield1 =
+              private final StateSpec<ValueState<Integer>> myfield1 =
                   StateSpecs.value(VarIntCoder.of());
 
               @StateId("my-id")
-              private final StateSpec<Object, ValueState<Long>> myfield2 =
+              private final StateSpec<ValueState<Long>> myfield2 =
                   StateSpecs.value(VarLongCoder.of());
 
               @ProcessElement
@@ -565,7 +565,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("my-id")
-              private StateSpec<Object, ValueState<Integer>> myfield =
+              private StateSpec<ValueState<Integer>> myfield =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
@@ -618,7 +618,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("my-id")
-              private final StateSpec<Object, ValueState<Integer>> myfield =
+              private final StateSpec<ValueState<Integer>> myfield =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
@@ -644,7 +644,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("my-id")
-              private final StateSpec<Object, ValueState<Integer>> myfield =
+              private final StateSpec<ValueState<Integer>> myfield =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
@@ -668,7 +668,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("my-id")
-              private final StateSpec<Object, ValueState<Integer>> myfield =
+              private final StateSpec<ValueState<Integer>> myfield =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
@@ -683,7 +683,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("foo")
-              private final StateSpec<Object, ValueState<Integer>> bizzle =
+              private final StateSpec<ValueState<Integer>> bizzle =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
@@ -728,7 +728,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFnUsingState() {
               @StateId(DoFnUsingState.STATE_ID)
-              private final StateSpec<Object, ValueState<Integer>> spec =
+              private final StateSpec<ValueState<Integer>> spec =
                   StateSpecs.value(VarIntCoder.of());
             }.getClass());
   }
@@ -770,7 +770,7 @@ public class DoFnSignaturesTest {
         DoFnSignatures.getSignature(
             new DoFn<KV<String, Integer>, Long>() {
               @StateId("foo")
-              private final StateSpec<Object, ValueState<Integer>> bizzleDecl =
+              private final StateSpec<ValueState<Integer>> bizzleDecl =
                   StateSpecs.value(VarIntCoder.of());
 
               @ProcessElement
@@ -803,7 +803,7 @@ public class DoFnSignaturesTest {
   public void testSimpleStateIdNamedDoFn() throws Exception {
     class DoFnForTestSimpleStateIdNamedDoFn extends DoFn<KV<String, Integer>, Long> {
       @StateId("foo")
-      private final StateSpec<Object, ValueState<Integer>> bizzle =
+      private final StateSpec<ValueState<Integer>> bizzle =
           StateSpecs.value(VarIntCoder.of());
 
       @ProcessElement
@@ -831,7 +831,7 @@ public class DoFnSignaturesTest {
       // Note that in order to have a coder for T it will require initialization in the constructor,
       // but that isn't important for this test
       @StateId("foo")
-      private final StateSpec<Object, ValueState<T>> bizzle = null;
+      private final StateSpec<ValueState<T>> bizzle = null;
 
       @ProcessElement
       public void foo(ProcessContext context) {}
@@ -866,7 +866,7 @@ public class DoFnSignaturesTest {
     public static final String STATE_ID = "my-state-id";
 
     @StateId(STATE_ID)
-    private final StateSpec<Object, ValueState<Integer>> bizzle =
+    private final StateSpec<ValueState<Integer>> bizzle =
         StateSpecs.value(VarIntCoder.of());
   }
 
@@ -882,7 +882,7 @@ public class DoFnSignaturesTest {
     public static final String STATE_ID = "my-state-id";
 
     @StateId(STATE_ID)
-    private final StateSpec<Object, ValueState<String>> myStateSpec =
+    private final StateSpec<ValueState<String>> myStateSpec =
         StateSpecs.value(StringUtf8Coder.of());
 
     @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
index 9714d72..9b79d11 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -59,7 +59,7 @@ public class FakeStepContext implements StepContext {
   }
 
   @Override
-  public StateInternals<?> stateInternals() {
+  public StateInternals stateInternals() {
     throw new UnsupportedOperationException();
   }