You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/02 18:56:21 UTC

[3/6] beam git commit: Simplify type parameters of StateSpec and related

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index 31e931c..cfe3f9b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CombineContextFactory;
 import org.apache.beam.sdk.util.state.BagState;
@@ -62,7 +61,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
  * <p>Note: Ignore index of key.
  * Mainly for SideInputs.
  */
-public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
+public class FlinkBroadcastStateInternals<K> implements StateInternals {
 
   private int indexInSubtaskGroup;
   private final DefaultOperatorStateBackend stateBackend;
@@ -86,7 +85,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address) {
+      StateTag<T> address) {
 
     return state(namespace, address, StateContexts.nullContext());
   }
@@ -94,36 +93,36 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address,
+      StateTag<T> address,
       final StateContext<?> context) {
 
     return address.bind(
-        new StateTag.StateBinder<K>() {
+        new StateTag.StateBinder() {
 
           @Override
           public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+              StateTag<ValueState<T>> address, Coder<T> coder) {
 
             return new FlinkBroadcastValueState<>(stateBackend, address, namespace, coder);
           }
 
           @Override
           public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+              StateTag<BagState<T>> address, Coder<T> elemCoder) {
 
             return new FlinkBroadcastBagState<>(stateBackend, address, namespace, elemCoder);
           }
 
           @Override
           public <T> SetState<T> bindSet(
-              StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+              StateTag<SetState<T>> address, Coder<T> elemCoder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", SetState.class.getSimpleName()));
           }
 
           @Override
           public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<? super K, MapState<KeyT, ValueT>> spec,
+              StateTag<MapState<KeyT, ValueT>> spec,
               Coder<KeyT> mapKeyCoder,
               Coder<ValueT> mapValueCoder) {
             throw new UnsupportedOperationException(
@@ -133,7 +132,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
 
@@ -144,7 +143,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
             return new FlinkCombiningStateWithContext<>(
@@ -158,8 +157,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
           }
 
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
+          public WatermarkHoldState bindWatermark(
+              StateTag<WatermarkHoldState> address,
               TimestampCombiner timestampCombiner) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
@@ -302,11 +301,11 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
       extends AbstractBroadcastState<T> implements ValueState<T> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, ValueState<T>> address;
+    private final StateTag<ValueState<T>> address;
 
     FlinkBroadcastValueState(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, ValueState<T>> address,
+        StateTag<ValueState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
       super(flinkStateBackend, address.getId(), namespace, coder);
@@ -363,11 +362,11 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
       implements BagState<T> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, BagState<T>> address;
+    private final StateTag<BagState<T>> address;
 
     FlinkBroadcastBagState(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, BagState<T>> address,
+        StateTag<BagState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
       super(flinkStateBackend, address.getId(), namespace, ListCoder.of(coder));
@@ -451,12 +450,12 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
 
     FlinkCombiningState(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder) {
@@ -568,13 +567,13 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
     private final FlinkBroadcastStateInternals<K> flinkStateInternals;
 
     FlinkKeyedCombiningState(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
@@ -704,14 +703,14 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
     private final FlinkBroadcastStateInternals<K> flinkStateInternals;
     private final CombineWithContext.Context context;
 
     FlinkCombiningStateWithContext(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index 67d7966..c9b7797 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.state.BagState;
@@ -67,7 +66,7 @@ import org.apache.flink.util.Preconditions;
  *
  * <p>Reference from {@link HeapInternalTimerService} to the local key-group range.
  */
-public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
+public class FlinkKeyGroupStateInternals<K> implements StateInternals {
 
   private final Coder<K> keyCoder;
   private final KeyGroupsList localKeyGroupRange;
@@ -109,7 +108,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address) {
+      StateTag<T> address) {
 
     return state(namespace, address, StateContexts.nullContext());
   }
@@ -117,36 +116,36 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address,
+      StateTag<T> address,
       final StateContext<?> context) {
 
     return address.bind(
-        new StateTag.StateBinder<K>() {
+        new StateTag.StateBinder() {
 
           @Override
           public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+              StateTag<ValueState<T>> address, Coder<T> coder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", ValueState.class.getSimpleName()));
           }
 
           @Override
           public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+              StateTag<BagState<T>> address, Coder<T> elemCoder) {
 
             return new FlinkKeyGroupBagState<>(address, namespace, elemCoder);
           }
 
           @Override
           public <T> SetState<T> bindSet(
-              StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+              StateTag<SetState<T>> address, Coder<T> elemCoder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", SetState.class.getSimpleName()));
           }
 
           @Override
           public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<? super K, MapState<KeyT, ValueT>> spec,
+              StateTag<MapState<KeyT, ValueT>> spec,
               Coder<KeyT> mapKeyCoder,
               Coder<ValueT> mapValueCoder) {
             throw new UnsupportedOperationException(
@@ -156,7 +155,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
             throw new UnsupportedOperationException("bindCombiningValue is not supported.");
@@ -165,7 +164,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
             throw new UnsupportedOperationException(
@@ -173,8 +172,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
           }
 
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
+          public WatermarkHoldState bindWatermark(
+              StateTag<WatermarkHoldState> address,
               TimestampCombiner timestampCombiner) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", CombiningState.class.getSimpleName()));
@@ -334,10 +333,10 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
       implements BagState<T> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, BagState<T>> address;
+    private final StateTag<BagState<T>> address;
 
     FlinkKeyGroupBagState(
-        StateTag<? super K, BagState<T>> address,
+        StateTag<BagState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
       super(address.getId(), namespace.stringKey(), ListCoder.of(coder),

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index ef6c3b2..3d38f88 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -26,7 +26,6 @@ import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -53,7 +52,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
  *  Ignore index of key and namespace.
  *  Just implement BagState.
  */
-public class FlinkSplitStateInternals<K> implements StateInternals<K> {
+public class FlinkSplitStateInternals<K> implements StateInternals {
 
   private final OperatorStateBackend stateBackend;
 
@@ -69,7 +68,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address) {
+      StateTag<T> address) {
 
     return state(namespace, address, StateContexts.nullContext());
   }
@@ -77,36 +76,36 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address,
+      StateTag<T> address,
       final StateContext<?> context) {
 
     return address.bind(
-        new StateTag.StateBinder<K>() {
+        new StateTag.StateBinder() {
 
           @Override
           public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+              StateTag<ValueState<T>> address, Coder<T> coder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", ValueState.class.getSimpleName()));
           }
 
           @Override
           public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+              StateTag<BagState<T>> address, Coder<T> elemCoder) {
 
             return new FlinkSplitBagState<>(stateBackend, address, namespace, elemCoder);
           }
 
           @Override
           public <T> SetState<T> bindSet(
-              StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+              StateTag<SetState<T>> address, Coder<T> elemCoder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", SetState.class.getSimpleName()));
           }
 
           @Override
           public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<? super K, MapState<KeyT, ValueT>> spec,
+              StateTag<MapState<KeyT, ValueT>> spec,
               Coder<KeyT> mapKeyCoder,
               Coder<ValueT> mapValueCoder) {
             throw new UnsupportedOperationException(
@@ -116,7 +115,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
             throw new UnsupportedOperationException("bindCombiningValue is not supported.");
@@ -125,7 +124,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
             throw new UnsupportedOperationException(
@@ -133,8 +132,8 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
           }
 
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
+          public WatermarkHoldState bindWatermark(
+              StateTag<WatermarkHoldState> address,
               TimestampCombiner timestampCombiner) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", CombiningState.class.getSimpleName()));
@@ -147,11 +146,11 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
     private final ListStateDescriptor<T> descriptor;
     private OperatorStateBackend flinkStateBackend;
     private final StateNamespace namespace;
-    private final StateTag<? super K, BagState<T>> address;
+    private final StateTag<BagState<T>> address;
 
     FlinkSplitBagState(
         OperatorStateBackend flinkStateBackend,
-        StateTag<? super K, BagState<T>> address,
+        StateTag<BagState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
       this.flinkStateBackend = flinkStateBackend;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index c99d085..c033be6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -57,7 +57,7 @@ import org.joda.time.Instant;
  * <p>Note: In the Flink streaming runner the key is always encoded
  * using an {@link Coder} and stored in a {@link ByteBuffer}.
  */
-public class FlinkStateInternals<K> implements StateInternals<K> {
+public class FlinkStateInternals<K> implements StateInternals {
 
   private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
   private Coder<K> keyCoder;
@@ -95,7 +95,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address) {
+      StateTag<T> address) {
 
     return state(namespace, address, StateContexts.nullContext());
   }
@@ -103,36 +103,36 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace,
-      StateTag<? super K, T> address,
+      StateTag<T> address,
       final StateContext<?> context) {
 
     return address.bind(
-        new StateTag.StateBinder<K>() {
+        new StateTag.StateBinder() {
 
           @Override
           public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+              StateTag<ValueState<T>> address, Coder<T> coder) {
 
             return new FlinkValueState<>(flinkStateBackend, address, namespace, coder);
           }
 
           @Override
           public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+              StateTag<BagState<T>> address, Coder<T> elemCoder) {
 
             return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder);
           }
 
           @Override
           public <T> SetState<T> bindSet(
-              StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+              StateTag<SetState<T>> address, Coder<T> elemCoder) {
             throw new UnsupportedOperationException(
                 String.format("%s is not supported", SetState.class.getSimpleName()));
           }
 
           @Override
           public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<? super K, MapState<KeyT, ValueT>> spec,
+              StateTag<MapState<KeyT, ValueT>> spec,
               Coder<KeyT> mapKeyCoder,
               Coder<ValueT> mapValueCoder) {
             throw new UnsupportedOperationException(
@@ -142,7 +142,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
 
@@ -153,7 +153,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
             return new FlinkCombiningStateWithContext<>(
@@ -167,8 +167,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
           }
 
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
+          public WatermarkHoldState bindWatermark(
+              StateTag<WatermarkHoldState> address,
               TimestampCombiner timestampCombiner) {
 
             return new FlinkWatermarkHoldState<>(
@@ -180,13 +180,13 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
   private static class FlinkValueState<K, T> implements ValueState<T> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, ValueState<T>> address;
+    private final StateTag<ValueState<T>> address;
     private final ValueStateDescriptor<T> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
 
     FlinkValueState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, ValueState<T>> address,
+        StateTag<ValueState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
 
@@ -266,13 +266,13 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
   private static class FlinkBagState<K, T> implements BagState<T> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, BagState<T>> address;
+    private final StateTag<BagState<T>> address;
     private final ListStateDescriptor<T> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
 
     FlinkBagState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, BagState<T>> address,
+        StateTag<BagState<T>> address,
         StateNamespace namespace,
         Coder<T> coder) {
 
@@ -379,14 +379,14 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
 
     FlinkCombiningState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder) {
@@ -547,7 +547,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
@@ -555,7 +555,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
 
     FlinkKeyedCombiningState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
@@ -718,7 +718,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
       implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
     private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
@@ -727,7 +727,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
 
     FlinkCombiningStateWithContext(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
@@ -886,7 +886,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
 
   private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
       implements WatermarkHoldState {
-    private final StateTag<? super K, WatermarkHoldState> address;
+    private final StateTag<WatermarkHoldState> address;
     private final TimestampCombiner timestampCombiner;
     private final StateNamespace namespace;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
@@ -896,7 +896,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     public FlinkWatermarkHoldState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
         FlinkStateInternals<K> flinkStateInternals,
-        StateTag<? super K, WatermarkHoldState> address,
+        StateTag<WatermarkHoldState> address,
         StateNamespace namespace,
         TimestampCombiner timestampCombiner) {
       this.address = address;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 4e18ac2..bda30e4 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -196,7 +196,7 @@ public class DoFnOperatorTest {
     DoFn<Integer, String> fn = new DoFn<Integer, String>() {
 
       @StateId("state")
-      private final StateSpec<Object, ValueState<String>> stateSpec =
+      private final StateSpec<ValueState<String>> stateSpec =
           StateSpecs.value(StringUtf8Coder.of());
 
       @ProcessElement
@@ -296,7 +296,7 @@ public class DoFnOperatorTest {
           private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
           @StateId(stateId)
-          private final StateSpec<Object, ValueState<String>> stateSpec =
+          private final StateSpec<ValueState<String>> stateSpec =
               StateSpecs.value(StringUtf8Coder.of());
 
           @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index 7e7d1e1..eb2c05f 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -56,12 +56,12 @@ public class FlinkBroadcastStateInternalsTest {
   private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
   private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
 
-  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+  private static final StateTag<CombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
 
   FlinkBroadcastStateInternals<String> underTest;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
index 5433d07..0e0267b 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
@@ -64,7 +64,7 @@ public class FlinkKeyGroupStateInternalsTest {
   private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
   private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
 
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
 
   FlinkKeyGroupStateInternals<String> underTest;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
index 08ae0c4..8033a9d 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
@@ -47,7 +47,7 @@ public class FlinkSplitStateInternalsTest {
   private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
   private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
 
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
 
   FlinkSplitStateInternals<String> underTest;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 17c43bf..cd00d9e 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -70,18 +70,18 @@ public class FlinkStateInternalsTest {
   private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
   private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
 
-  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+  private static final StateTag<CombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
 
   FlinkStateInternals<String> underTest;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
index ce7f678..38129ab 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -103,7 +103,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
   private static class DummyStatefulDoFn extends DoFn<KV<Integer, Integer>, Integer> {
 
     @StateId("foo")
-    private final StateSpec<Object, ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of());
+    private final StateSpec<ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of());
 
     @ProcessElement
     public void processElem(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 343d51b..63e1166 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -890,7 +890,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
             ParDo.of(
                 new DoFn<KV<Integer, Integer>, Integer>() {
                   @StateId("unused")
-                  final StateSpec<Object, ValueState<Integer>> stateSpec =
+                  final StateSpec<ValueState<Integer>> stateSpec =
                       StateSpecs.value(VarIntCoder.of());
 
                   @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index cdc23ff..afaba3a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.state.BagState;
@@ -51,7 +50,7 @@ import org.joda.time.Instant;
 /**
  * An implementation of {@link StateInternals} for the SparkRunner.
  */
-class SparkStateInternals<K> implements StateInternals<K> {
+class SparkStateInternals<K> implements StateInternals {
 
   private final K key;
   //Serializable state for internals (namespace to state tag to coded value).
@@ -86,50 +85,47 @@ class SparkStateInternals<K> implements StateInternals<K> {
   }
 
   @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+  public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
     return state(namespace, address, StateContexts.nullContext());
   }
 
   @Override
   public <T extends State> T state(
       StateNamespace namespace,
-      StateTag<? super K, T> address,
+      StateTag<T> address,
       StateContext<?> c) {
-    return address.bind(new SparkStateBinder(key, namespace, c));
+    return address.bind(new SparkStateBinder(namespace, c));
   }
 
-  private class SparkStateBinder implements StateBinder<K> {
-    private final K key;
+  private class SparkStateBinder implements StateBinder {
     private final StateNamespace namespace;
     private final StateContext<?> c;
 
-    private SparkStateBinder(K key,
-                             StateNamespace namespace,
+    private SparkStateBinder(StateNamespace namespace,
                              StateContext<?> c) {
-      this.key = key;
       this.namespace = namespace;
       this.c = c;
     }
 
     @Override
-    public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+    public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
       return new SparkValueState<>(namespace, address, coder);
     }
 
     @Override
-    public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+    public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
       return new SparkBagState<>(namespace, address, elemCoder);
     }
 
     @Override
-    public <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> elemCoder) {
+    public <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder) {
       throw new UnsupportedOperationException(
           String.format("%s is not supported", SetState.class.getSimpleName()));
     }
 
     @Override
     public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-        StateTag<? super K, MapState<KeyT, ValueT>> spec,
+        StateTag<MapState<KeyT, ValueT>> spec,
         Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
       throw new UnsupportedOperationException(
           String.format("%s is not supported", MapState.class.getSimpleName()));
@@ -138,7 +134,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValue(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             CombineFn<InputT, AccumT, OutputT> combineFn) {
       return new SparkCombiningState<>(namespace, address, accumCoder, combineFn);
@@ -147,7 +143,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValueWithContext(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
       return new SparkCombiningState<>(
@@ -155,8 +151,8 @@ class SparkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-        StateTag<? super K, WatermarkHoldState> address,
+    public WatermarkHoldState bindWatermark(
+        StateTag<WatermarkHoldState> address,
         TimestampCombiner timestampCombiner) {
       return new SparkWatermarkHoldState(namespace, address, timestampCombiner);
     }
@@ -164,12 +160,12 @@ class SparkStateInternals<K> implements StateInternals<K> {
 
   private class AbstractState<T> {
     final StateNamespace namespace;
-    final StateTag<?, ? extends State> address;
+    final StateTag<? extends State> address;
     final Coder<T> coder;
 
     private AbstractState(
         StateNamespace namespace,
-        StateTag<?, ? extends State> address,
+        StateTag<? extends State> address,
         Coder<T> coder) {
       this.namespace = namespace;
       this.address = address;
@@ -218,7 +214,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
 
     private SparkValueState(
             StateNamespace namespace,
-            StateTag<?, ValueState<T>> address,
+            StateTag<ValueState<T>> address,
             Coder<T> coder) {
       super(namespace, address, coder);
     }
@@ -246,7 +242,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
 
     public SparkWatermarkHoldState(
         StateNamespace namespace,
-        StateTag<?, WatermarkHoldState> address,
+        StateTag<WatermarkHoldState> address,
         TimestampCombiner timestampCombiner) {
       super(namespace, address, InstantCoder.of());
       this.timestampCombiner = timestampCombiner;
@@ -300,7 +296,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
 
     private SparkCombiningState(
         StateNamespace namespace,
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
         Coder<AccumT> coder,
         CombineFn<InputT, AccumT, OutputT> combineFn) {
       super(namespace, address, coder);
@@ -363,7 +359,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
   private final class SparkBagState<T> extends AbstractState<List<T>> implements BagState<T> {
     private SparkBagState(
         StateNamespace namespace,
-        StateTag<?, BagState<T>> address,
+        StateTag<BagState<T>> address,
         Coder<T> coder) {
       super(namespace, address, ListCoder.of(coder));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index 0a00c45..063feef 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -93,7 +93,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
     InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
     timerInternals.advanceProcessingTime(Instant.now());
     timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-    StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+    StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
     GABWOutputWindowedValue<K, InputT> outputter = new GABWOutputWindowedValue<>();
 
     ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner =

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 3e8dde5..ffe343b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -124,7 +124,7 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
         Coder<W> windowCoder) throws IOException { }
 
     @Override
-    public StateInternals<?> stateInternals() {
+    public StateInternals stateInternals() {
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index ef1ff9f..7b6f9ed 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -65,7 +65,7 @@ public final class TranslationUtils {
    */
   static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>, Serializable {
     @Override
-    public StateInternals<K> stateInternalsForKey(K key) {
+    public StateInternals stateInternalsForKey(K key) {
       return InMemoryStateInternals.forKey(key);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 9b99ca4..0368476 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -331,10 +331,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   /**
    * Annotation for declaring and dereferencing state cells.
    *
-   * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a {@link
-   * StateId}. To use the cell during processing, add a parameter of the appropriate {@link State}
-   * subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} method, and
-   * annotate it with {@link StateId}. See the following code for an example:
+   * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a
+   * {@link StateId}. To use the cell during processing, add a parameter of the appropriate {@link
+   * State} subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer}
+   * method, and annotate it with {@link StateId}. See the following code for an example:
    *
    * <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} {
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index b5547e3..02f3a85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -112,14 +112,14 @@ public class GroupIntoBatches<K, InputT>
     private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
     @StateId(BATCH_ID)
-    private final StateSpec<Object, BagState<InputT>> batchSpec;
+    private final StateSpec<BagState<InputT>> batchSpec;
 
     @StateId(NUM_ELEMENTS_IN_BATCH_ID)
-    private final StateSpec<Object, CombiningState<Long, Long, Long>>
+    private final StateSpec<CombiningState<Long, Long, Long>>
         numElementsInBatchSpec;
 
     @StateId(KEY_ID)
-    private final StateSpec<Object, ValueState<K>> keySpec;
+    private final StateSpec<ValueState<K>> keySpec;
 
     private final long prefetchFrequency;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 1f6afbf..6137a7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -446,7 +446,7 @@ public class ParDo {
     Map<String, DoFnSignature.StateDeclaration> stateDeclarations = signature.stateDeclarations();
     for (DoFnSignature.StateDeclaration stateDeclaration : stateDeclarations.values()) {
       try {
-        StateSpec<?, ?> stateSpec = (StateSpec<?, ?>) stateDeclaration.field().get(fn);
+        StateSpec<?> stateSpec = (StateSpec<?>) stateDeclaration.field().get(fn);
         stateSpec.offerCoders(codersForStateSpecTypes(stateDeclaration, coderRegistry, inputCoder));
         stateSpec.finishSpecifying();
       } catch (IllegalAccessException e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index 6fe37a1..48fa742 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -20,34 +20,36 @@ package org.apache.beam.sdk.util.state;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 
 /**
  * Visitor for binding a {@link StateSpec} and to the associated {@link State}.
- *
- * @param <K> the type of key this binder embodies.
  */
-public interface StateBinder<K> {
-  <T> ValueState<T> bindValue(String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder);
+public interface StateBinder {
+  <T> ValueState<T> bindValue(
+      String id, StateSpec<ValueState<T>> spec, Coder<T> coder);
 
-  <T> BagState<T> bindBag(String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder);
+  <T> BagState<T> bindBag(
+      String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder);
 
-  <T> SetState<T> bindSet(String id, StateSpec<? super K, SetState<T>> spec, Coder<T> elemCoder);
+  <T> SetState<T> bindSet(
+      String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder);
 
   <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-      String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
-      Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
+      String id,
+      StateSpec<MapState<KeyT, ValueT>> spec,
+      Coder<KeyT> mapKeyCoder,
+      Coder<ValueT> mapValueCoder);
 
   <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(
       String id,
-      StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+      StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
       Coder<AccumT> accumCoder,
       Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
 
   <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
       String id,
-      StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+      StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
       Coder<AccumT> accumCoder,
       CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
 
@@ -57,8 +59,8 @@ public interface StateBinder<K> {
    * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added
    * to the returned {@link WatermarkHoldState} are to be combined.
    */
-  <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+  WatermarkHoldState bindWatermark(
       String id,
-      StateSpec<? super K, WatermarkHoldState> spec,
+      StateSpec<WatermarkHoldState> spec,
       TimestampCombiner timestampCombiner);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
index 6b94c40..8eda218 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
@@ -26,23 +26,22 @@ import org.apache.beam.sdk.coders.Coder;
  * A specification of a persistent state cell. This includes information necessary to encode the
  * value and details about the intended access pattern.
  *
- * @param <K> The type of key that must be used with the state tag. Contravariant: methods should
- *            accept values of type {@code StateSpec<? super K, StateT>}.
  * @param <StateT> The type of state being described.
  */
 @Experimental(Kind.STATE)
-public interface StateSpec<K, StateT extends State> extends Serializable {
+public interface StateSpec<StateT extends State> extends Serializable {
 
   /**
    * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address.
    */
-  StateT bind(String id, StateBinder<? extends K> binder);
+  StateT bind(String id, StateBinder binder);
 
   /**
-   * Given {code coders} are inferred from type arguments defined for this class.
-   * Coders which are already set should take precedence over offered coders.
-   * @param coders Array of coders indexed by the type arguments order. Entries might be null if
-   *               the coder could not be inferred.
+   * Given {code coders} are inferred from type arguments defined for this class. Coders which are
+   * already set should take precedence over offered coders.
+   *
+   * @param coders Array of coders indexed by the type arguments order. Entries might be null if the
+   *     coder could not be inferred.
    */
   void offerCoders(Coder[] coders);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index a057a0b..49d5722 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 
 /**
@@ -42,12 +41,12 @@ public class StateSpecs {
   private StateSpecs() {}
 
   /** Create a simple state spec for values of type {@code T}. */
-  public static <T> StateSpec<Object, ValueState<T>> value() {
+  public static <T> StateSpec<ValueState<T>> value() {
     return new ValueStateSpec<>(null);
   }
 
   /** Create a simple state spec for values of type {@code T}. */
-  public static <T> StateSpec<Object, ValueState<T>> value(Coder<T> valueCoder) {
+  public static <T> StateSpec<ValueState<T>> value(Coder<T> valueCoder) {
     checkArgument(valueCoder != null, "valueCoder should not be null. Consider value() instead");
     return new ValueStateSpec<>(valueCoder);
   }
@@ -57,17 +56,27 @@ public class StateSpecs {
    * {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-  StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+  StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
       CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn);
   }
 
   /**
+   * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge
+   * multiple {@code InputT}s into a single {@code OutputT}.
+   */
+  public static <InputT, AccumT, OutputT>
+      StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
+          CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+    return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(null, combineFn);
+  }
+
+  /**
    * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
    * {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+  StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
           Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     checkArgument(accumCoder != null,
         "accumCoder should not be null. "
@@ -80,11 +89,8 @@ public class StateSpecs {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+      StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
           Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
-    checkArgument(accumCoder != null,
-        "accumCoder should not be null. "
-            + "Consider using combining(CombineFn<> combineFn) instead.");
     return combiningInternal(accumCoder, combineFn);
   }
 
@@ -96,7 +102,7 @@ public class StateSpecs {
    * only be used to initialize static values.
    */
   public static <InputT, AccumT, OutputT>
-      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>>
+  StateSpec<CombiningState<InputT, AccumT, OutputT>>
   combiningFromInputInternal(
               Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     try {
@@ -113,13 +119,13 @@ public class StateSpecs {
   }
 
   private static <InputT, AccumT, OutputT>
-      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+  StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
           Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
 
   private static <InputT, AccumT, OutputT>
-  StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+  StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
       Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
     return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
@@ -128,7 +134,7 @@ public class StateSpecs {
    * Create a state spec that is optimized for adding values frequently, and occasionally retrieving
    * all the values that have been added.
    */
-  public static <T> StateSpec<Object, BagState<T>> bag() {
+  public static <T> StateSpec<BagState<T>> bag() {
     return bag(null);
   }
 
@@ -136,49 +142,46 @@ public class StateSpecs {
    * Create a state spec that is optimized for adding values frequently, and occasionally retrieving
    * all the values that have been added.
    */
-  public static <T> StateSpec<Object, BagState<T>> bag(Coder<T> elemCoder) {
+  public static <T> StateSpec<BagState<T>> bag(Coder<T> elemCoder) {
     return new BagStateSpec<>(elemCoder);
   }
 
   /**
    * Create a state spec that supporting for {@link java.util.Set} like access patterns.
    */
-  public static <T> StateSpec<Object, SetState<T>> set() {
+  public static <T> StateSpec<SetState<T>> set() {
     return set(null);
   }
 
   /**
    * Create a state spec that supporting for {@link java.util.Set} like access patterns.
    */
-  public static <T> StateSpec<Object, SetState<T>> set(Coder<T> elemCoder) {
+  public static <T> StateSpec<SetState<T>> set(Coder<T> elemCoder) {
     return new SetStateSpec<>(elemCoder);
   }
 
   /**
    * Create a state spec that supporting for {@link java.util.Map} like access patterns.
    */
-  public static <K, V> StateSpec<Object, MapState<K, V>> map() {
+  public static <K, V> StateSpec<MapState<K, V>> map() {
     return new MapStateSpec<>(null, null);
   }
 
-  /**
-   * Create a state spec that supporting for {@link java.util.Map} like access patterns.
-   */
-  public static <K, V> StateSpec<Object, MapState<K, V>> map(Coder<K> keyCoder,
-                                                             Coder<V> valueCoder) {
+  /** Create a state spec that supporting for {@link java.util.Map} like access patterns. */
+  public static <K, V> StateSpec<MapState<K, V>> map(Coder<K> keyCoder, Coder<V> valueCoder) {
     return new MapStateSpec<>(keyCoder, valueCoder);
   }
 
   /** Create a state spec for holding the watermark. */
-  public static <W extends BoundedWindow>
-      StateSpec<Object, WatermarkHoldState> watermarkStateInternal(
+  public static
+      StateSpec<WatermarkHoldState> watermarkStateInternal(
           TimestampCombiner timestampCombiner) {
-    return new WatermarkStateSpecInternal<W>(timestampCombiner);
+    return new WatermarkStateSpecInternal(timestampCombiner);
   }
 
-  public static <K, InputT, AccumT, OutputT>
-      StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
-          StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
+  public static <InputT, AccumT, OutputT>
+      StateSpec<BagState<AccumT>> convertToBagSpecInternal(
+          StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
     if (combiningSpec instanceof CombiningStateSpec) {
       // Checked above; conversion to a bag spec depends on the provided spec being one of those
       // created via the factory methods in this class.
@@ -201,7 +204,7 @@ public class StateSpecs {
    *
    * <p>Includes the coder for {@code T}.
    */
-  private static class ValueStateSpec<T> implements StateSpec<Object, ValueState<T>> {
+  private static class ValueStateSpec<T> implements StateSpec<ValueState<T>> {
 
     @Nullable
     private Coder<T> coder;
@@ -211,7 +214,7 @@ public class StateSpecs {
     }
 
     @Override
-    public ValueState<T> bind(String id, StateBinder<?> visitor) {
+    public ValueState<T> bind(String id, StateBinder visitor) {
       return visitor.bindValue(id, this, coder);
     }
 
@@ -260,7 +263,7 @@ public class StateSpecs {
    * <p>Includes the {@link CombineFn} and the coder for the accumulator type.
    */
   private static class CombiningStateSpec<InputT, AccumT, OutputT>
-      implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
+      implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
@@ -275,7 +278,7 @@ public class StateSpecs {
 
     @Override
     public CombiningState<InputT, AccumT, OutputT> bind(
-        String id, StateBinder<? extends Object> visitor) {
+        String id, StateBinder visitor) {
       return visitor.bindCombining(id, this, accumCoder, combineFn);
     }
 
@@ -320,7 +323,7 @@ public class StateSpecs {
       return Objects.hash(getClass(), accumCoder);
     }
 
-    private StateSpec<Object, BagState<AccumT>> asBagSpec() {
+    private StateSpec<BagState<AccumT>> asBagSpec() {
       return new BagStateSpec<AccumT>(accumCoder);
     }
   }
@@ -332,7 +335,7 @@ public class StateSpecs {
    * <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type.
    */
   private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT>
-      implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
+      implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable private Coder<AccumT> accumCoder;
     private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
@@ -346,7 +349,7 @@ public class StateSpecs {
 
     @Override
     public CombiningState<InputT, AccumT, OutputT> bind(
-        String id, StateBinder<? extends Object> visitor) {
+        String id, StateBinder visitor) {
       return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn);
     }
 
@@ -392,7 +395,7 @@ public class StateSpecs {
       return Objects.hash(getClass(), accumCoder);
     }
 
-    private StateSpec<Object, BagState<AccumT>> asBagSpec() {
+    private StateSpec<BagState<AccumT>> asBagSpec() {
       return new BagStateSpec<AccumT>(accumCoder);
     }
   }
@@ -403,7 +406,7 @@ public class StateSpecs {
    *
    * <p>Includes the coder for the element type {@code T}</p>
    */
-  private static class BagStateSpec<T> implements StateSpec<Object, BagState<T>> {
+  private static class BagStateSpec<T> implements StateSpec<BagState<T>> {
 
     @Nullable
     private Coder<T> elemCoder;
@@ -413,7 +416,7 @@ public class StateSpecs {
     }
 
     @Override
-    public BagState<T> bind(String id, StateBinder<?> visitor) {
+    public BagState<T> bind(String id, StateBinder visitor) {
       return visitor.bindBag(id, this, elemCoder);
     }
 
@@ -456,7 +459,7 @@ public class StateSpecs {
     }
   }
 
-  private static class MapStateSpec<K, V> implements StateSpec<Object, MapState<K, V>> {
+  private static class MapStateSpec<K, V> implements StateSpec<MapState<K, V>> {
 
     @Nullable
     private Coder<K> keyCoder;
@@ -469,7 +472,7 @@ public class StateSpecs {
     }
 
     @Override
-    public MapState<K, V> bind(String id, StateBinder<?> visitor) {
+    public MapState<K, V> bind(String id, StateBinder visitor) {
       return visitor.bindMap(id, this, keyCoder, valueCoder);
     }
 
@@ -523,7 +526,7 @@ public class StateSpecs {
    *
    * <p>Includes the coder for the element type {@code T}</p>
    */
-  private static class SetStateSpec<T> implements StateSpec<Object, SetState<T>> {
+  private static class SetStateSpec<T> implements StateSpec<SetState<T>> {
 
     @Nullable
     private Coder<T> elemCoder;
@@ -533,7 +536,7 @@ public class StateSpecs {
     }
 
     @Override
-    public SetState<T> bind(String id, StateBinder<?> visitor) {
+    public SetState<T> bind(String id, StateBinder visitor) {
       return visitor.bindSet(id, this, elemCoder);
     }
 
@@ -582,8 +585,7 @@ public class StateSpecs {
    * <p>Includes the {@link TimestampCombiner} according to which the output times
    * are combined.
    */
-  private static class WatermarkStateSpecInternal<W extends BoundedWindow>
-      implements StateSpec<Object, WatermarkHoldState> {
+  private static class WatermarkStateSpecInternal implements StateSpec<WatermarkHoldState> {
 
     /**
      * When multiple output times are added to hold the watermark, this determines how they are
@@ -597,7 +599,7 @@ public class StateSpecs {
     }
 
     @Override
-    public WatermarkHoldState bind(String id, StateBinder<?> visitor) {
+    public WatermarkHoldState bind(String id, StateBinder visitor) {
       return visitor.bindWatermark(id, this, timestampCombiner);
     }