You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/02 18:56:22 UTC

[4/6] beam git commit: Simplify type parameters of StateSpec and related

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
index 315110d..a056937 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
@@ -50,11 +50,11 @@ import org.joda.time.Instant;
 public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
 
   private final WindowFn<?, W> windowFn;
-  private StateInternals<?> stateInternals;
+  private StateInternals stateInternals;
   private final Coder<W> windowCoder;
 
-  public TriggerStateMachineContextFactory(WindowFn<?, W> windowFn,
-      StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) {
+  public TriggerStateMachineContextFactory(
+      WindowFn<?, W> windowFn, StateInternals stateInternals, ActiveWindowSet<W> activeWindows) {
     // Future triggers may be able to exploit the active window to state address window mapping.
     this.windowFn = windowFn;
     this.stateInternals = stateInternals;
@@ -263,7 +263,7 @@ public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
     }
 
     @Override
-    public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) {
+    public <StateT extends State> StateT access(StateTag<StateT> address) {
       return stateInternals.state(windowNamespace, address);
     }
   }
@@ -280,13 +280,13 @@ public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
 
     @Override
     public <StateT extends State> StateT access(
-        StateTag<? super Object, StateT> address) {
+        StateTag<StateT> address) {
       return stateInternals.state(windowNamespace, address);
     }
 
     @Override
     public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-        StateTag<? super Object, StateT> address) {
+        StateTag<StateT> address) {
       ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
       for (W mergingWindow : activeToBeMerged) {
         StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address);

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index e26241a..fc2f696 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -57,7 +57,7 @@ import org.joda.time.Instant;
  */
 public class TriggerStateMachineRunner<W extends BoundedWindow> {
   @VisibleForTesting
-  public static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
+  public static final StateTag<ValueState<BitSet>> FINISHED_BITS_TAG =
       StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
 
   private final ExecutableTriggerStateMachine rootTrigger;

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index bc33366..054a2e2 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -570,7 +570,7 @@ public class GroupAlsoByWindowsProperties {
   }
 
   private static final class CachingStateInternalsFactory<K> implements StateInternalsFactory<K> {
-    private final LoadingCache<K, StateInternals<K>> stateInternalsCache;
+    private final LoadingCache<K, StateInternals> stateInternalsCache;
 
     private CachingStateInternalsFactory() {
       this.stateInternalsCache = CacheBuilder.newBuilder().build(new StateInternalsLoader<K>());
@@ -578,7 +578,7 @@ public class GroupAlsoByWindowsProperties {
 
     @Override
     @SuppressWarnings("unchecked")
-    public StateInternals<K> stateInternalsForKey(K key) {
+    public StateInternals stateInternalsForKey(K key) {
       try {
         return stateInternalsCache.get(key);
       } catch (Exception exc) {
@@ -587,9 +587,9 @@ public class GroupAlsoByWindowsProperties {
     }
   }
 
-  private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals<K>> {
+  private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals> {
     @Override
-    public StateInternals<K> load(K key) throws Exception {
+    public StateInternals load(K key) throws Exception {
       return InMemoryStateInternals.forKey(key);
     }
   }
@@ -686,7 +686,7 @@ public class GroupAlsoByWindowsProperties {
         }
 
         @Override
-        public StateInternals<?> stateInternals() {
+        public StateInternals stateInternals() {
           throw new UnsupportedOperationException();
         }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 6248401..16f7f26 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -60,22 +60,22 @@ public class InMemoryStateInternalsTest {
   private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
   private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
 
-  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+  private static final StateTag<CombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, SetState<String>> STRING_SET_ADDR =
+  private static final StateTag<SetState<String>> STRING_SET_ADDR =
       StateTags.set("stringSet", StringUtf8Coder.of());
-  private static final StateTag<Object, MapState<String, Integer>> STRING_MAP_ADDR =
+  private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR =
       StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
 
   InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey");

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
index 95d6977..7a83a18 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
@@ -45,7 +45,7 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class MergingActiveWindowSetTest {
   private Sessions windowFn;
-  private StateInternals<String> state;
+  private StateInternals state;
   private MergingActiveWindowSet<IntervalWindow> set;
   private ActiveWindowSet.MergeCallback<IntervalWindow> callback;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index eba0f67..573855f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -314,14 +314,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) {
     assertHasOnlyGlobalAndAllowedTags(
         ImmutableSet.copyOf(expectedWindows),
-        ImmutableSet.<StateTag<? super String, ?>>of(TriggerStateMachineRunner.FINISHED_BITS_TAG));
+        ImmutableSet.<StateTag<?>>of(TriggerStateMachineRunner.FINISHED_BITS_TAG));
   }
 
   @SafeVarargs
   public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expectedWindows) {
     assertHasOnlyGlobalAndAllowedTags(
         ImmutableSet.copyOf(expectedWindows),
-        ImmutableSet.<StateTag<? super String, ?>>of(
+        ImmutableSet.<StateTag<?>>of(
             TriggerStateMachineRunner.FINISHED_BITS_TAG,
             PaneInfoTracker.PANE_INFO_TAG,
             WatermarkHold.watermarkHoldTagForTimestampCombiner(
@@ -331,14 +331,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
   public final void assertHasOnlyGlobalState() {
     assertHasOnlyGlobalAndAllowedTags(
-        Collections.<W>emptySet(), Collections.<StateTag<? super String, ?>>emptySet());
+        Collections.<W>emptySet(), Collections.<StateTag<?>>emptySet());
   }
 
   @SafeVarargs
   public final void assertHasOnlyGlobalAndPaneInfoFor(W... expectedWindows) {
     assertHasOnlyGlobalAndAllowedTags(
         ImmutableSet.copyOf(expectedWindows),
-        ImmutableSet.<StateTag<? super String, ?>>of(
+        ImmutableSet.<StateTag<?>>of(
             PaneInfoTracker.PANE_INFO_TAG,
             WatermarkHold.watermarkHoldTagForTimestampCombiner(
                 objectStrategy.getTimestampCombiner()),
@@ -350,30 +350,30 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
    * {@code expectedWindows} and that each of these windows has only tags from {@code allowedTags}.
    */
   private void assertHasOnlyGlobalAndAllowedTags(
-      Set<W> expectedWindows, Set<StateTag<? super String, ?>> allowedTags) {
+      Set<W> expectedWindows, Set<StateTag<?>> allowedTags) {
     Set<StateNamespace> expectedWindowsSet = new HashSet<>();
     for (W expectedWindow : expectedWindows) {
       expectedWindowsSet.add(windowNamespace(expectedWindow));
     }
-    Map<StateNamespace, Set<StateTag<? super String, ?>>> actualWindows = new HashMap<>();
+    Map<StateNamespace, Set<StateTag<?>>> actualWindows = new HashMap<>();
 
     for (StateNamespace namespace : stateInternals.getNamespacesInUse()) {
       if (namespace instanceof StateNamespaces.GlobalNamespace) {
         continue;
       } else if (namespace instanceof StateNamespaces.WindowNamespace) {
-        Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace);
+        Set<StateTag<?>> tagsInUse = stateInternals.getTagsInUse(namespace);
         if (tagsInUse.isEmpty()) {
           continue;
         }
         actualWindows.put(namespace, tagsInUse);
-        Set<StateTag<? super String, ?>> unexpected = Sets.difference(tagsInUse, allowedTags);
+        Set<StateTag<?>> unexpected = Sets.difference(tagsInUse, allowedTags);
         if (unexpected.isEmpty()) {
           continue;
         } else {
           fail(namespace + " has unexpected states: " + tagsInUse);
         }
       } else if (namespace instanceof StateNamespaces.WindowAndTriggerNamespace) {
-        Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace);
+        Set<StateTag<?>> tagsInUse = stateInternals.getTagsInUse(namespace);
         assertTrue(namespace + " contains " + tagsInUse, tagsInUse.isEmpty());
       } else {
         fail("Unrecognized namespace " + namespace);

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 1a44453..a67db6d 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -231,7 +231,7 @@ public class SplittableParDoTest {
       processFn.setStateInternalsFactory(
           new StateInternalsFactory<String>() {
             @Override
-            public StateInternals<String> stateInternalsForKey(String key) {
+            public StateInternals stateInternalsForKey(String key) {
               return stateInternals;
             }
           });

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
index 9a8b75c..fc08dcc 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -41,10 +41,10 @@ import org.junit.runners.JUnit4;
 public class StateTagTest {
   @Test
   public void testValueEquality() {
-    StateTag<?, ?> fooVarInt1 = StateTags.value("foo", VarIntCoder.of());
-    StateTag<?, ?> fooVarInt2 = StateTags.value("foo", VarIntCoder.of());
-    StateTag<?, ?> fooBigEndian = StateTags.value("foo", BigEndianIntegerCoder.of());
-    StateTag<?, ?> barVarInt = StateTags.value("bar", VarIntCoder.of());
+    StateTag<?> fooVarInt1 = StateTags.value("foo", VarIntCoder.of());
+    StateTag<?> fooVarInt2 = StateTags.value("foo", VarIntCoder.of());
+    StateTag<?> fooBigEndian = StateTags.value("foo", BigEndianIntegerCoder.of());
+    StateTag<?> barVarInt = StateTags.value("bar", VarIntCoder.of());
 
     assertEquals(fooVarInt1, fooVarInt2);
     assertNotEquals(fooVarInt1, fooBigEndian);
@@ -53,10 +53,10 @@ public class StateTagTest {
 
   @Test
   public void testBagEquality() {
-    StateTag<?, ?> fooVarInt1 = StateTags.bag("foo", VarIntCoder.of());
-    StateTag<?, ?> fooVarInt2 = StateTags.bag("foo", VarIntCoder.of());
-    StateTag<?, ?> fooBigEndian = StateTags.bag("foo", BigEndianIntegerCoder.of());
-    StateTag<?, ?> barVarInt = StateTags.bag("bar", VarIntCoder.of());
+    StateTag<?> fooVarInt1 = StateTags.bag("foo", VarIntCoder.of());
+    StateTag<?> fooVarInt2 = StateTags.bag("foo", VarIntCoder.of());
+    StateTag<?> fooBigEndian = StateTags.bag("foo", BigEndianIntegerCoder.of());
+    StateTag<?> barVarInt = StateTags.bag("bar", VarIntCoder.of());
 
     assertEquals(fooVarInt1, fooVarInt2);
     assertNotEquals(fooVarInt1, fooBigEndian);
@@ -65,10 +65,10 @@ public class StateTagTest {
 
   @Test
   public void testSetEquality() {
-    StateTag<?, ?> fooVarInt1 = StateTags.set("foo", VarIntCoder.of());
-    StateTag<?, ?> fooVarInt2 = StateTags.set("foo", VarIntCoder.of());
-    StateTag<?, ?> fooBigEndian = StateTags.set("foo", BigEndianIntegerCoder.of());
-    StateTag<?, ?> barVarInt = StateTags.set("bar", VarIntCoder.of());
+    StateTag<?> fooVarInt1 = StateTags.set("foo", VarIntCoder.of());
+    StateTag<?> fooVarInt2 = StateTags.set("foo", VarIntCoder.of());
+    StateTag<?> fooBigEndian = StateTags.set("foo", BigEndianIntegerCoder.of());
+    StateTag<?> barVarInt = StateTags.set("bar", VarIntCoder.of());
 
     assertEquals(fooVarInt1, fooVarInt2);
     assertNotEquals(fooVarInt1, fooBigEndian);
@@ -77,15 +77,15 @@ public class StateTagTest {
 
   @Test
   public void testMapEquality() {
-    StateTag<?, ?> fooStringVarInt1 =
+    StateTag<?> fooStringVarInt1 =
         StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
-    StateTag<?, ?> fooStringVarInt2 =
+    StateTag<?> fooStringVarInt2 =
         StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
-    StateTag<?, ?> fooStringBigEndian =
+    StateTag<?> fooStringBigEndian =
         StateTags.map("foo", StringUtf8Coder.of(), BigEndianIntegerCoder.of());
-    StateTag<?, ?> fooVarIntBigEndian =
+    StateTag<?> fooVarIntBigEndian =
         StateTags.map("foo", VarIntCoder.of(), BigEndianIntegerCoder.of());
-    StateTag<?, ?> barStringVarInt =
+    StateTag<?> barStringVarInt =
         StateTags.map("bar", StringUtf8Coder.of(), VarIntCoder.of());
 
     assertEquals(fooStringVarInt1, fooStringVarInt2);
@@ -97,11 +97,11 @@ public class StateTagTest {
 
   @Test
   public void testWatermarkBagEquality() {
-    StateTag<?, ?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
-    StateTag<?, ?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
-    StateTag<?, ?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST);
+    StateTag<?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+    StateTag<?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+    StateTag<?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST);
 
-    StateTag<?, ?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST);
+    StateTag<?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST);
 
     // Same id, same fn.
     assertEquals(foo1, foo2);
@@ -119,12 +119,12 @@ public class StateTagTest {
     Coder<Integer> input2 = BigEndianIntegerCoder.of();
     Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers();
 
-    StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
-    StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
-    StateTag<?, ?> fooCoder1Min = StateTags.combiningValueFromInputInternal("foo", input1, minFn);
+    StateTag<?> fooCoder1Max1 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
+    StateTag<?> fooCoder1Max2 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
+    StateTag<?> fooCoder1Min = StateTags.combiningValueFromInputInternal("foo", input1, minFn);
 
-    StateTag<?, ?> fooCoder2Max = StateTags.combiningValueFromInputInternal("foo", input2, maxFn);
-    StateTag<?, ?> barCoder1Max = StateTags.combiningValueFromInputInternal("bar", input1, maxFn);
+    StateTag<?> fooCoder2Max = StateTags.combiningValueFromInputInternal("foo", input2, maxFn);
+    StateTag<?> barCoder1Max = StateTags.combiningValueFromInputInternal("bar", input1, maxFn);
 
     // Same name, coder and combineFn
     assertEquals(fooCoder1Max1, fooCoder1Max2);
@@ -162,16 +162,16 @@ public class StateTagTest {
     Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of());
     Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of());
 
-    StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueWithContext(
+    StateTag<?> fooCoder1Max1 = StateTags.combiningValueWithContext(
             "foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
-    StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueWithContext(
+    StateTag<?> fooCoder1Max2 = StateTags.combiningValueWithContext(
         "foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
-    StateTag<?, ?> fooCoder1Min = StateTags.combiningValueWithContext(
+    StateTag<?> fooCoder1Min = StateTags.combiningValueWithContext(
         "foo", accum1, CombineFnUtil.toFnWithContext(minFn));
 
-    StateTag<?, ?> fooCoder2Max = StateTags.combiningValueWithContext(
+    StateTag<?> fooCoder2Max = StateTags.combiningValueWithContext(
         "foo", accum2, CombineFnUtil.toFnWithContext(maxFn));
-    StateTag<?, ?> barCoder1Max = StateTags.combiningValueWithContext(
+    StateTag<?> barCoder1Max = StateTags.combiningValueWithContext(
         "bar", accum1, CombineFnUtil.toFnWithContext(maxFn));
 
     // Same name, coder and combineFn

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index aeaa63b..f80643a 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -131,7 +131,7 @@ public class StatefulDoFnRunnerTest {
     timerInternals.advanceInputWatermark(new Instant(1L));
 
     MyDoFn fn = new MyDoFn();
-    StateTag<Object, ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState);
+    StateTag<ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState);
 
     DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
         fn,
@@ -227,7 +227,7 @@ public class StatefulDoFnRunnerTest {
     public final String stateId = "foo";
 
     @StateId(stateId)
-    public final StateSpec<Object, ValueState<Integer>> intState =
+    public final StateSpec<ValueState<Integer>> intState =
         StateSpecs.value(VarIntCoder.of());
 
     @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index 92d87b5..ef3a053 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -60,24 +60,25 @@ import org.joda.time.Instant;
  * of {@link InMemoryState}. Whenever state that exists in the underlying {@link StateTable} is
  * accessed, an independent copy will be created within this table.
  */
-public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> {
-  private final K key;
-  private final CopyOnAccessInMemoryStateTable<K> table;
+public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals {
+  private final CopyOnAccessInMemoryStateTable table;
+
+  private K key;
 
   /**
    * Creates a new {@link CopyOnAccessInMemoryStateInternals} with the underlying (possibly null)
    * StateInternals.
    */
-  public static <K> CopyOnAccessInMemoryStateInternals<K> withUnderlying(
-      K key, @Nullable CopyOnAccessInMemoryStateInternals<K> underlying) {
-    return new CopyOnAccessInMemoryStateInternals<K>(key, underlying);
+  public static <K> CopyOnAccessInMemoryStateInternals withUnderlying(
+      K key, @Nullable CopyOnAccessInMemoryStateInternals underlying) {
+    return new CopyOnAccessInMemoryStateInternals<>(key, underlying);
   }
 
   private CopyOnAccessInMemoryStateInternals(
-      K key, CopyOnAccessInMemoryStateInternals<K> underlying) {
+      K key, CopyOnAccessInMemoryStateInternals underlying) {
     this.key = key;
     table =
-        new CopyOnAccessInMemoryStateTable<K>(key, underlying == null ? null : underlying.table);
+        new CopyOnAccessInMemoryStateTable(underlying == null ? null : underlying.table);
   }
 
   /**
@@ -94,7 +95,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
    *
    * @return this table
    */
-  public CopyOnAccessInMemoryStateInternals<K> commit() {
+  public CopyOnAccessInMemoryStateInternals commit() {
     table.commit();
     return this;
   }
@@ -116,18 +117,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
   }
 
   @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+  public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
     return state(namespace, address, StateContexts.nullContext());
   }
 
   @Override
   public <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
+      StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
     return table.get(namespace, address, c);
   }
 
   @Override
-  public K getKey() {
+  public Object getKey() {
     return key;
   }
 
@@ -140,9 +141,8 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
    * {@link StateTable#get(StateNamespace, StateTag, StateContext)}, first attempts to obtain a
    * copy of existing {@link State} from an underlying {@link StateTable}.
    */
-  private static class CopyOnAccessInMemoryStateTable<K> extends StateTable<K> {
-    private final K key;
-    private Optional<StateTable<K>> underlying;
+  private static class CopyOnAccessInMemoryStateTable extends StateTable {
+    private Optional<StateTable> underlying;
 
     /**
      * The StateBinderFactory currently in use by this {@link CopyOnAccessInMemoryStateTable}.
@@ -162,17 +162,16 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
      *       when a {@link StateTag} is bound.</li>
      * </ul>
      */
-    private StateBinderFactory<K> binderFactory;
+    private StateBinderFactory binderFactory;
 
     /**
      * The earliest watermark hold in this table.
      */
     private Optional<Instant> earliestWatermarkHold;
 
-    public CopyOnAccessInMemoryStateTable(K key, StateTable<K> underlying) {
-      this.key = key;
+    public CopyOnAccessInMemoryStateTable(StateTable underlying) {
       this.underlying = Optional.fromNullable(underlying);
-      binderFactory = new CopyOnBindBinderFactory<>(key, this.underlying);
+      binderFactory = new CopyOnBindBinderFactory(this.underlying);
       earliestWatermarkHold = Optional.absent();
     }
 
@@ -191,7 +190,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
     private void commit() {
       Instant earliestHold = getEarliestWatermarkHold();
       if (underlying.isPresent()) {
-        ReadThroughBinderFactory<K> readThroughBinder =
+        ReadThroughBinderFactory readThroughBinder =
             new ReadThroughBinderFactory<>(underlying.get());
         binderFactory = readThroughBinder;
         Instant earliestUnderlyingHold = readThroughBinder.readThroughAndGetEarliestHold(this);
@@ -201,7 +200,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
       }
       earliestWatermarkHold = Optional.of(earliestHold);
       clearEmpty();
-      binderFactory = new InMemoryStateBinderFactory<>(key);
+      binderFactory = new InMemoryStateBinderFactory();
       underlying = Optional.absent();
     }
 
@@ -246,37 +245,35 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
     }
 
     @Override
-    protected StateBinder<K> binderForNamespace(final StateNamespace namespace, StateContext<?> c) {
+    protected StateBinder binderForNamespace(final StateNamespace namespace, StateContext<?> c) {
       return binderFactory.forNamespace(namespace, c);
     }
 
-    private interface StateBinderFactory<K> {
-      StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c);
+    private interface StateBinderFactory {
+      StateBinder forNamespace(StateNamespace namespace, StateContext<?> c);
     }
 
     /**
      * {@link StateBinderFactory} that creates a copy of any existing state when the state is bound.
      */
-    private static class CopyOnBindBinderFactory<K> implements StateBinderFactory<K> {
-      private final K key;
-      private final Optional<StateTable<K>> underlying;
+    private static class CopyOnBindBinderFactory implements StateBinderFactory {
+      private final Optional<StateTable> underlying;
 
-      public CopyOnBindBinderFactory(K key, Optional<StateTable<K>> underlying) {
-        this.key = key;
+      public CopyOnBindBinderFactory(Optional<StateTable> underlying) {
         this.underlying = underlying;
       }
 
-      private boolean containedInUnderlying(StateNamespace namespace, StateTag<? super K, ?> tag) {
+      private boolean containedInUnderlying(StateNamespace namespace, StateTag<?> tag) {
         return underlying.isPresent() && underlying.get().isNamespaceInUse(namespace)
             && underlying.get().getTagsInUse(namespace).containsKey(tag);
       }
 
       @Override
-      public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
-        return new StateBinder<K>() {
+      public StateBinder forNamespace(final StateNamespace namespace, final StateContext<?> c) {
+        return new StateBinder() {
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
+          public WatermarkHoldState bindWatermark(
+              StateTag<WatermarkHoldState> address,
               TimestampCombiner timestampCombiner) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
@@ -291,7 +288,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
 
           @Override
           public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+              StateTag<ValueState<T>> address, Coder<T> coder) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
               InMemoryState<? extends ValueState<T>> existingState =
@@ -306,7 +303,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineFn<InputT, AccumT, OutputT> combineFn) {
             if (containedInUnderlying(namespace, address)) {
@@ -322,7 +319,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
 
           @Override
           public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+              StateTag<BagState<T>> address, Coder<T> elemCoder) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
               InMemoryState<? extends BagState<T>> existingState =
@@ -336,7 +333,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
 
           @Override
           public <T> SetState<T> bindSet(
-              StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+              StateTag<SetState<T>> address, Coder<T> elemCoder) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
               InMemoryState<? extends SetState<T>> existingState =
@@ -350,7 +347,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
 
           @Override
           public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<? super K, MapState<KeyT, ValueT>> address,
+              StateTag<MapState<KeyT, ValueT>> address,
               Coder<KeyT> mapKeyCoder,
               Coder<ValueT> mapValueCoder) {
             if (containedInUnderlying(namespace, address)) {
@@ -367,7 +364,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
           @Override
           public <InputT, AccumT, OutputT>
               CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
             return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
@@ -381,17 +378,17 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
      * to {@link CopyOnAccessInMemoryStateTable#commit()} to read all values from
      * the underlying table.
      */
-    private static class ReadThroughBinderFactory<K> implements StateBinderFactory<K> {
-      private final StateTable<K> underlying;
+    private static class ReadThroughBinderFactory<K> implements StateBinderFactory {
+      private final StateTable underlying;
 
-      public ReadThroughBinderFactory(StateTable<K> underlying) {
+      public ReadThroughBinderFactory(StateTable underlying) {
         this.underlying = underlying;
       }
 
-      public Instant readThroughAndGetEarliestHold(StateTable<K> readTo) {
+      public Instant readThroughAndGetEarliestHold(StateTable readTo) {
         Instant earliestHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
         for (StateNamespace namespace : underlying.getNamespacesInUse()) {
-          for (Map.Entry<StateTag<? super K, ?>, ? extends State> existingState :
+          for (Map.Entry<StateTag<?>, ? extends State> existingState :
               underlying.getTagsInUse(namespace).entrySet()) {
             if (!((InMemoryState<?>) existingState.getValue()).isCleared()) {
               // Only read through non-cleared values to ensure that completed windows are
@@ -412,44 +409,44 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
       }
 
       @Override
-      public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
-        return new StateBinder<K>() {
+      public StateBinder forNamespace(final StateNamespace namespace, final StateContext<?> c) {
+        return new StateBinder() {
           @Override
-          public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
-              StateTag<? super K, WatermarkHoldState> address,
+          public WatermarkHoldState bindWatermark(
+              StateTag<WatermarkHoldState> address,
               TimestampCombiner timestampCombiner) {
             return underlying.get(namespace, address, c);
           }
 
           @Override
           public <T> ValueState<T> bindValue(
-              StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+              StateTag<ValueState<T>> address, Coder<T> coder) {
             return underlying.get(namespace, address, c);
           }
 
           @Override
           public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
               bindCombiningValue(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
             return underlying.get(namespace, address, c);
           }
 
           @Override
           public <T> BagState<T> bindBag(
-              StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+              StateTag<BagState<T>> address, Coder<T> elemCoder) {
             return underlying.get(namespace, address, c);
           }
 
           @Override
           public <T> SetState<T> bindSet(
-              StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+              StateTag<SetState<T>> address, Coder<T> elemCoder) {
             return underlying.get(namespace, address, c);
           }
 
           @Override
           public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<? super K, MapState<KeyT, ValueT>> address,
+              StateTag<MapState<KeyT, ValueT>> address,
               Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
             return underlying.get(namespace, address, c);
           }
@@ -457,7 +454,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
           @Override
           public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
           bindCombiningValueWithContext(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
             return bindCombiningValue(
@@ -467,16 +464,13 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
       }
     }
 
-    private static class InMemoryStateBinderFactory<K> implements StateBinderFactory<K> {
-      private final K key;
+    private static class InMemoryStateBinderFactory implements StateBinderFactory {
 
-      public InMemoryStateBinderFactory(K key) {
-        this.key = key;
-      }
+      public InMemoryStateBinderFactory() {}
 
       @Override
-      public StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c) {
-        return new InMemoryStateBinder<>(key, c);
+      public StateBinder forNamespace(StateNamespace namespace, StateContext<?> c) {
+        return new InMemoryStateBinder(c);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 1108f0d..107f39a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -34,11 +34,14 @@ class DirectExecutionContext
     extends BaseExecutionContext<DirectStepContext> {
   private final Clock clock;
   private final StructuralKey<?> key;
-  private final CopyOnAccessInMemoryStateInternals<Object> existingState;
+  private final CopyOnAccessInMemoryStateInternals existingState;
   private final TransformWatermarks watermarks;
 
-  public DirectExecutionContext(Clock clock, StructuralKey<?> key,
-      CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) {
+  public DirectExecutionContext(
+      Clock clock,
+      StructuralKey<?> key,
+      CopyOnAccessInMemoryStateInternals existingState,
+      TransformWatermarks watermarks) {
     this.clock = clock;
     this.key = key;
     this.existingState = existingState;
@@ -55,7 +58,7 @@ class DirectExecutionContext
    */
   public class DirectStepContext
       extends BaseExecutionContext.StepContext {
-    private CopyOnAccessInMemoryStateInternals<Object> stateInternals;
+    private CopyOnAccessInMemoryStateInternals<?> stateInternals;
     private DirectTimerInternals timerInternals;
 
     public DirectStepContext(
@@ -64,7 +67,7 @@ class DirectExecutionContext
     }
 
     @Override
-    public CopyOnAccessInMemoryStateInternals<Object> stateInternals() {
+    public CopyOnAccessInMemoryStateInternals<?> stateInternals() {
       if (stateInternals == null) {
         stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState);
       }
@@ -84,7 +87,7 @@ class DirectExecutionContext
      * Commits the state of this step, and returns the committed state. If the step has not
      * accessed any state, return null.
      */
-    public CopyOnAccessInMemoryStateInternals<?> commitState() {
+    public CopyOnAccessInMemoryStateInternals commitState() {
       if (stateInternals != null) {
         return stateInternals.commit();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 54ce027..f6d9a36 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -90,7 +90,7 @@ class EvaluationContext {
   private final WatermarkCallbackExecutor callbackExecutor;
 
   /** The stateInternals of the world, by applied PTransform and key. */
-  private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>>
+  private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals>
       applicationStateInternals;
 
   private final SideInputContainer sideInputContainer;
@@ -179,9 +179,9 @@ class EvaluationContext {
       result.getAggregatorChanges().commit();
     }
     // Update state internals
-    CopyOnAccessInMemoryStateInternals<?> theirState = result.getState();
+    CopyOnAccessInMemoryStateInternals theirState = result.getState();
     if (theirState != null) {
-      CopyOnAccessInMemoryStateInternals<?> committedState = theirState.commit();
+      CopyOnAccessInMemoryStateInternals committedState = theirState.commit();
       StepAndKey stepAndKey =
           StepAndKey.of(
               result.getTransform(), completedBundle == null ? null : completedBundle.getKey());
@@ -331,7 +331,7 @@ class EvaluationContext {
     return new DirectExecutionContext(
         clock,
         key,
-        (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
+        (CopyOnAccessInMemoryStateInternals) applicationStateInternals.get(stepAndKey),
         watermarkManager.getWatermarks(application));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index f1e29c6..9f567a4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -164,8 +164,8 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
               (PCollection<KV<K, Iterable<V>>>)
                   Iterables.getOnlyElement(application.getOutputs().values()));
       outputBundles.add(bundle);
-      CopyOnAccessInMemoryStateInternals<K> stateInternals =
-          (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();
+      CopyOnAccessInMemoryStateInternals stateInternals =
+          (CopyOnAccessInMemoryStateInternals) stepContext.stateInternals();
       DirectTimerInternals timerInternals = stepContext.timerInternals();
       ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
           new ReduceFnRunner<>(
@@ -191,7 +191,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
     @Override
     public TransformResult<KeyedWorkItem<K, V>> finishBundle() throws Exception {
       // State is initialized within the constructor. It can never be null.
-      CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+      CopyOnAccessInMemoryStateInternals state = stepContext.commitState();
       return StepTransformResult.<KeyedWorkItem<K, V>>withHold(
               application, state.getEarliestWatermarkHold())
           .withState(state)

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index cab11db..053da31 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -211,7 +211,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
       throw UserCodeException.wrap(e);
     }
     StepTransformResult.Builder<InputT> resultBuilder;
-    CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+    CopyOnAccessInMemoryStateInternals state = stepContext.commitState();
     if (state != null) {
       resultBuilder =
           StepTransformResult.<InputT>withHold(transform, state.getEarliestWatermarkHold())

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 7efdb52..e0adc40 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -127,7 +127,7 @@ class SplittableProcessElementsEvaluatorFactory<
         new StateInternalsFactory<String>() {
           @SuppressWarnings({"unchecked", "rawtypes"})
           @Override
-          public StateInternals<String> stateInternalsForKey(String key) {
+          public StateInternals stateInternalsForKey(String key) {
             return (StateInternals) stepContext.stateInternals();
           }
         });

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 8793ae8..93ab077 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -175,10 +175,11 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
             @Override
             public void run() {
               for (StateDeclaration stateDecl : signature.stateDeclarations().values()) {
-                StateTag<Object, ?> tag;
+                StateTag<?> tag;
                 try {
                   tag =
-                      StateTags.tagForSpec(stateDecl.id(), (StateSpec) stateDecl.field().get(doFn));
+                      StateTags.tagForSpec(
+                          stateDecl.id(), (StateSpec) stateDecl.field().get(doFn));
                 } catch (IllegalAccessException e) {
                   throw new RuntimeException(
                       String.format(

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 01b2a72..fe3ae97 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -70,7 +70,7 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
     private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
     private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElementsBuilder;
     private MetricUpdates metricUpdates;
-    private CopyOnAccessInMemoryStateInternals<?> state;
+    private CopyOnAccessInMemoryStateInternals state;
     private TimerUpdate timerUpdate;
     private AggregatorContainer.Mutator aggregatorChanges;
     private final Set<OutputType> producedOutputs;
@@ -109,7 +109,7 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
       return this;
     }
 
-    public Builder<InputT> withState(CopyOnAccessInMemoryStateInternals<?> state) {
+    public Builder<InputT> withState(CopyOnAccessInMemoryStateInternals state) {
       this.state = state;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index 8bb5f93..bde44ca 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -85,7 +85,7 @@ public interface TransformResult<InputT> {
    * <p>If this evaluation did not access state, this may return null.
    */
   @Nullable
-  CopyOnAccessInMemoryStateInternals<?> getState();
+  CopyOnAccessInMemoryStateInternals getState();
 
   /**
    * Returns a TimerUpdateBuilder that was produced as a result of this evaluation. If the

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 4d04745..3e29a69 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -72,7 +72,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = internals.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -92,7 +92,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = internals.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -114,18 +114,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
    */
   @Test
   public void testGetWithPresentInUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, ValueState<String>> valueTag = StateTags.value("foo", StringUtf8Coder.of());
+    StateTag<ValueState<String>> valueTag = StateTags.value("foo", StringUtf8Coder.of());
     ValueState<String> underlyingValue = underlying.state(namespace, valueTag);
     assertThat(underlyingValue.read(), nullValue(String.class));
 
     underlyingValue.write("bar");
     assertThat(underlyingValue.read(), equalTo("bar"));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     ValueState<String> copyOnAccessState = internals.state(namespace, valueTag);
     assertThat(copyOnAccessState.read(), equalTo("bar"));
@@ -140,18 +140,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testBagStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<Integer>> valueTag = StateTags.bag("foo", VarIntCoder.of());
+    StateTag<BagState<Integer>> valueTag = StateTags.bag("foo", VarIntCoder.of());
     BagState<Integer> underlyingValue = underlying.state(namespace, valueTag);
     assertThat(underlyingValue.read(), emptyIterable());
 
     underlyingValue.add(1);
     assertThat(underlyingValue.read(), containsInAnyOrder(1));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     BagState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
     assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
@@ -166,18 +166,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testSetStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, SetState<Integer>> valueTag = StateTags.set("foo", VarIntCoder.of());
+    StateTag<SetState<Integer>> valueTag = StateTags.set("foo", VarIntCoder.of());
     SetState<Integer> underlyingValue = underlying.state(namespace, valueTag);
     assertThat(underlyingValue.read(), emptyIterable());
 
     underlyingValue.add(1);
     assertThat(underlyingValue.read(), containsInAnyOrder(1));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     SetState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
     assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
@@ -192,11 +192,11 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testMapStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, MapState<String, Integer>> valueTag =
+    StateTag<MapState<String, Integer>> valueTag =
         StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
     MapState<String, Integer> underlyingValue = underlying.state(namespace, valueTag);
     assertThat(underlyingValue.entries().read(), emptyIterable());
@@ -204,7 +204,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     underlyingValue.put("hello", 1);
     assertThat(underlyingValue.get("hello").read(), equalTo(1));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     MapState<String, Integer> copyOnAccessState = internals.state(namespace, valueTag);
     assertThat(copyOnAccessState.get("hello").read(), equalTo(1));
@@ -221,13 +221,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
     CombineFn<Long, long[], Long> sumLongFn = Sum.ofLongs();
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
     CoderRegistry reg = pipeline.getCoderRegistry();
-    StateTag<Object, CombiningState<Long, long[], Long>> stateTag =
+    StateTag<CombiningState<Long, long[], Long>> stateTag =
         StateTags.combiningValue("summer",
             sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
     GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
@@ -236,7 +236,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     underlyingValue.add(1L);
     assertThat(underlyingValue.read(), equalTo(1L));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
     assertThat(copyOnAccessState.read(), equalTo(1L));
@@ -251,13 +251,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testWatermarkHoldStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST;
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, WatermarkHoldState> stateTag =
+    StateTag<WatermarkHoldState> stateTag =
         StateTags.watermarkStateInternal("wmstate", timestampCombiner);
     WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), nullValue());
@@ -265,7 +265,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     underlyingValue.add(new Instant(250L));
     assertThat(underlyingValue.read(), equalTo(new Instant(250L)));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
     WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag);
     assertThat(copyOnAccessState.read(), equalTo(new Instant(250L)));
@@ -284,10 +284,10 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithoutUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = internals.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -304,13 +304,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = underlying.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -331,15 +331,15 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithClearedInUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> secondUnderlying =
+    CopyOnAccessInMemoryStateInternals<String>secondUnderlying =
         spy(CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying));
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, secondUnderlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = underlying.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -361,13 +361,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithOverwrittenUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = underlying.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -392,15 +392,15 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithAddedUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     internals.commit();
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = underlying.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -416,7 +416,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithEmptyTableIsEmpty() {
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     internals.commit();
@@ -426,11 +426,11 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithOnlyClearedValuesIsEmpty() {
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = internals.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -444,13 +444,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testCommitWithEmptyNewAndFullUnderlyingIsNotEmpty() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+    StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
     BagState<String> stringBag = underlying.state(namespace, bagTag);
     assertThat(stringBag.read(), emptyIterable());
 
@@ -475,16 +475,16 @@ public class CopyOnAccessInMemoryStateInternalsTest {
         return new Instant(689743L);
       }
     };
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
 
-    StateTag<Object, WatermarkHoldState> firstHoldAddress =
+    StateTag<WatermarkHoldState> firstHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
     WatermarkHoldState firstHold =
         internals.state(StateNamespaces.window(null, first), firstHoldAddress);
     firstHold.add(new Instant(22L));
 
-    StateTag<Object, WatermarkHoldState> secondHoldAddress =
+    StateTag<WatermarkHoldState> secondHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
     WatermarkHoldState secondHold =
         internals.state(StateNamespaces.window(null, second), secondHoldAddress);
@@ -508,18 +508,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
         return new Instant(689743L);
       }
     };
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
-    StateTag<Object, WatermarkHoldState> firstHoldAddress =
+    StateTag<WatermarkHoldState> firstHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
     WatermarkHoldState firstHold =
         underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
     firstHold.add(new Instant(22L));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
 
-    StateTag<Object, WatermarkHoldState> secondHoldAddress =
+    StateTag<WatermarkHoldState> secondHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
     WatermarkHoldState secondHold =
         internals.state(StateNamespaces.window(null, second), secondHoldAddress);
@@ -545,18 +545,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
             return new Instant(689743L);
           }
         };
-    CopyOnAccessInMemoryStateInternals<String> underlying =
+    CopyOnAccessInMemoryStateInternals<String>underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
-    StateTag<Object, WatermarkHoldState> firstHoldAddress =
+    StateTag<WatermarkHoldState> firstHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
     WatermarkHoldState firstHold =
         underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
     firstHold.add(new Instant(224L));
 
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
 
-    StateTag<Object, WatermarkHoldState> secondHoldAddress =
+    StateTag<WatermarkHoldState> secondHoldAddress =
         StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
     WatermarkHoldState secondHold =
         internals.state(StateNamespaces.window(null, second), secondHoldAddress);
@@ -568,7 +568,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
   @Test
   public void testGetEarliestHoldBeforeCommit() {
-    CopyOnAccessInMemoryStateInternals<String> internals =
+    CopyOnAccessInMemoryStateInternals<String>internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
 
     internals

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 51ae12a..6f9adc4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -524,6 +524,7 @@ public class DirectRunnerTest implements Serializable {
   }
 
   private static class LongNoDecodeCoder extends CustomCoder<Long> {
+
     @Override
     public void encode(
         Long value, OutputStream outStream, Context context) throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 0c3a8ed..bfbcd79 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -161,7 +161,7 @@ public class EvaluationContextTest {
         context.getExecutionContext(createdProducer,
             StructuralKey.of("foo", StringUtf8Coder.of()));
 
-    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+    StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     DirectStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1");
     stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
@@ -194,7 +194,7 @@ public class EvaluationContextTest {
         context.getExecutionContext(createdProducer,
             StructuralKey.of("foo", StringUtf8Coder.of()));
 
-    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+    StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     fooContext
         .getOrCreateStepContext("s1", "s1")
@@ -221,7 +221,7 @@ public class EvaluationContextTest {
     DirectExecutionContext fooContext =
         context.getExecutionContext(createdProducer, myKey);
 
-    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+    StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     fooContext
         .getOrCreateStepContext("s1", "s1")
@@ -246,9 +246,9 @@ public class EvaluationContextTest {
     DirectExecutionContext fooContext =
         context.getExecutionContext(downstreamProducer, myKey);
 
-    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+    StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
-    CopyOnAccessInMemoryStateInternals<Object> state =
+    CopyOnAccessInMemoryStateInternals<?> state =
         fooContext.getOrCreateStepContext("s1", "s1").stateInternals();
     BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
     bag.add(1);
@@ -268,7 +268,7 @@ public class EvaluationContextTest {
     DirectExecutionContext afterResultContext =
         context.getExecutionContext(downstreamProducer, myKey);
 
-    CopyOnAccessInMemoryStateInternals<Object> afterResultState =
+    CopyOnAccessInMemoryStateInternals<?> afterResultState =
         afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals();
     assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index ecb8130..fc63406 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -92,7 +92,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
   @Mock private transient UncommittedBundle<Integer> mockUncommittedBundle;
 
   private static final String KEY = "any-key";
-  private transient StateInternals<Object> stateInternals =
+  private transient StateInternals stateInternals =
       CopyOnAccessInMemoryStateInternals.<Object>withUnderlying(KEY, null);
 
   private static final BundleFactory BUNDLE_FACTORY = ImmutableListBundleFactory.create();
@@ -104,7 +104,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
-    when((StateInternals<Object>) mockStepContext.stateInternals()).thenReturn(stateInternals);
+    when((StateInternals) mockStepContext.stateInternals()).thenReturn(stateInternals);
     when(mockEvaluationContext.createSideInputReader(anyList()))
         .thenReturn(
             SideInputContainer.create(
@@ -133,7 +133,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
                     ParDo.of(
                             new DoFn<KV<String, Integer>, Integer>() {
                               @StateId(stateId)
-                              private final StateSpec<Object, ValueState<String>> spec =
+                              private final StateSpec<ValueState<String>> spec =
                                   StateSpecs.value(StringUtf8Coder.of());
 
                               @ProcessElement
@@ -165,7 +165,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
         StateNamespaces.window(IntervalWindow.getCoder(), firstWindow);
     StateNamespace secondWindowNamespace =
         StateNamespaces.window(IntervalWindow.getCoder(), secondWindow);
-    StateTag<Object, ValueState<String>> tag =
+    StateTag<ValueState<String>> tag =
         StateTags.tagForSpec(stateId, StateSpecs.value(StringUtf8Coder.of()));
 
     // Set up non-empty state. We don't mock + verify calls to clear() but instead
@@ -247,7 +247,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
                         .of(
                             new DoFn<KV<String, Integer>, Integer>() {
                               @StateId(stateId)
-                              private final StateSpec<Object, ValueState<String>> spec =
+                              private final StateSpec<ValueState<String>> spec =
                                   StateSpecs.value(StringUtf8Coder.of());
 
                               @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
index 847a00a..8640801 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
@@ -61,7 +61,7 @@ public class FlinkNoOpStepContext implements StepContext {
   }
 
   @Override
-  public StateInternals<?> stateInternals() {
+  public StateInternals stateInternals() {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 879fad7..a79f856 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -123,7 +123,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
         Collections.<TupleTag<?>>emptyList(),
         new FlinkNoOpStepContext() {
           @Override
-          public StateInternals<?> stateInternals() {
+          public StateInternals stateInternals() {
             return stateInternals;
           }
           @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 01830de..d8fd79a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -133,7 +133,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected transient long currentOutputWatermark;
 
-  private transient StateTag<Object, BagState<WindowedValue<InputT>>> pushedBackTag;
+  private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
 
   protected transient FlinkStateInternals<?> stateInternals;
 
@@ -149,7 +149,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected transient FlinkTimerInternals timerInternals;
 
-  private transient StateInternals<?> pushbackStateInternals;
+  private transient StateInternals pushbackStateInternals;
 
   private transient Optional<Long> pushedBackWatermark;
 
@@ -673,7 +673,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     }
 
     @Override
-    public StateInternals<?> stateInternals() {
+    public StateInternals stateInternals() {
       return stateInternals;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index fb6762d..1887a99 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -94,10 +94,10 @@ public class SplittableDoFnOperator<
 
     StateInternalsFactory<String> stateInternalsFactory = new StateInternalsFactory<String>() {
       @Override
-      public StateInternals<String> stateInternalsForKey(String key) {
+      public StateInternals stateInternalsForKey(String key) {
         //this will implicitly be keyed by the key of the incoming
         // element or by the key of a firing timer
-        return (StateInternals<String>) stateInternals;
+        return (StateInternals) stateInternals;
       }
     };
     TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() {

http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 9718734..3899303 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -83,10 +83,10 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
     StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() {
       @Override
-      public StateInternals<K> stateInternalsForKey(K key) {
+      public StateInternals stateInternalsForKey(K key) {
         //this will implicitly be keyed by the key of the incoming
         // element or by the key of a firing timer
-        return (StateInternals<K>) stateInternals;
+        return (StateInternals) stateInternals;
       }
     };
     TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() {