You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/06 22:34:15 UTC

[4/6] beam git commit: Rename AccumulatorCombiningState to CombiningState

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 095ca2a..26f1c98 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -29,8 +29,8 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -115,7 +115,7 @@ public class GroupIntoBatches<K, InputT>
     private final StateSpec<Object, BagState<InputT>> batchSpec;
 
     @StateId(NUM_ELEMENTS_IN_BATCH_ID)
-    private final StateSpec<Object, AccumulatorCombiningState<Long, Long, Long>>
+    private final StateSpec<Object, CombiningState<Long, Long, Long>>
         numElementsInBatchSpec;
 
     @StateId(KEY_ID)
@@ -171,7 +171,7 @@ public class GroupIntoBatches<K, InputT>
         @TimerId(END_OF_WINDOW_ID) Timer timer,
         @StateId(BATCH_ID) BagState<InputT> batch,
         @StateId(NUM_ELEMENTS_IN_BATCH_ID)
-            AccumulatorCombiningState<Long, Long, Long> numElementsInBatch,
+            CombiningState<Long, Long, Long> numElementsInBatch,
         @StateId(KEY_ID) ValueState<K> key,
         ProcessContext c,
         BoundedWindow window) {
@@ -203,7 +203,7 @@ public class GroupIntoBatches<K, InputT>
         @StateId(KEY_ID) ValueState<K> key,
         @StateId(BATCH_ID) BagState<InputT> batch,
         @StateId(NUM_ELEMENTS_IN_BATCH_ID)
-            AccumulatorCombiningState<Long, Long, Long> numElementsInBatch,
+            CombiningState<Long, Long, Long> numElementsInBatch,
         BoundedWindow window) {
       LOGGER.debug(
           "*** END OF WINDOW *** for timer timestamp {} in windows {}",
@@ -215,7 +215,7 @@ public class GroupIntoBatches<K, InputT>
         Context c,
         ValueState<K> key,
         BagState<InputT> batch,
-        AccumulatorCombiningState<Long, Long, Long> numElementsInBatch) {
+        CombiningState<Long, Long, Long> numElementsInBatch) {
       Iterable<InputT> values = batch.read();
       // when the timer fires, batch state might be empty
       if (Iterables.size(values) > 0) {

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
deleted file mode 100644
index 6b120f9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
- * to {@link GroupingState} that includes the {@code AccumT} type.
- *
- * @param <InputT> the type of values added to the state
- * @param <AccumT> the type of accumulator
- * @param <OutputT> the type of value extracted from the state
- */
-public interface AccumulatorCombiningState<InputT, AccumT, OutputT>
-    extends GroupingState<InputT, OutputT> {
-
-  /**
-   * Read the merged accumulator for this combining value. It is implied that reading the
-   * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
-   * this.
-   */
-  AccumT getAccum();
-
-  /**
-   * Add an accumulator to this combining value. Depending on implementation this may immediately
-   * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
-   */
-  void addAccum(AccumT accum);
-
-  /**
-   * Merge the given accumulators according to the underlying combiner.
-   */
-  AccumT mergeAccumulators(Iterable<AccumT> accumulators);
-
-  @Override
-  AccumulatorCombiningState<InputT, AccumT, OutputT> readLater();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
new file mode 100644
index 0000000..80e4dc9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
+ * to {@link GroupingState} that includes the {@code AccumT} type.
+ *
+ * @param <InputT> the type of values added to the state
+ * @param <AccumT> the type of accumulator
+ * @param <OutputT> the type of value extracted from the state
+ */
+public interface CombiningState<InputT, AccumT, OutputT>
+    extends GroupingState<InputT, OutputT> {
+
+  /**
+   * Read the merged accumulator for this combining value. It is implied that reading the
+   * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
+   * this.
+   */
+  AccumT getAccum();
+
+  /**
+   * Add an accumulator to this combining value. Depending on implementation this may immediately
+   * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
+   */
+  void addAccum(AccumT accum);
+
+  /**
+   * Merge the given accumulators according to the underlying combiner.
+   */
+  AccumT mergeAccumulators(Iterable<AccumT> accumulators);
+
+  @Override
+  CombiningState<InputT, AccumT, OutputT> readLater();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index fbfb475..98f7238 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -39,23 +39,23 @@ public interface StateBinder<K> {
       String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
 
-  <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+  <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
       String id,
-      StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+      StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
       Coder<AccumT> accumCoder,
       Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
 
   <InputT, AccumT, OutputT>
-      AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+  CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
           String id,
-          StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+          StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
           Coder<AccumT> accumCoder,
           Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
 
   <InputT, AccumT, OutputT>
-      AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+  CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
           String id,
-          StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+          StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
           Coder<AccumT> accumCoder,
           CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
               combineFn);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index db4b7de..974e11d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -62,7 +62,7 @@ public class StateSpecs {
    * {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-  StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue(
+  StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
       CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new CombiningValueStateSpec<InputT, AccumT, OutputT>(null, combineFn);
   }
@@ -72,7 +72,7 @@ public class StateSpecs {
    * {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-      StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue(
+      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
           Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     checkArgument(accumCoder != null,
         "accumCoder should not be null. "
@@ -85,7 +85,7 @@ public class StateSpecs {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-  StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+  StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
       KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
     return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
   }
@@ -95,7 +95,7 @@ public class StateSpecs {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-      StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
           Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
     checkArgument(accumCoder != null,
         "accumCoder should not be null. "
@@ -108,7 +108,7 @@ public class StateSpecs {
    * merge multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-  StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+  StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
   keyedCombiningValueWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
     return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
   }
@@ -118,7 +118,7 @@ public class StateSpecs {
    * merge multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-      StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+      StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
           keyedCombiningValueWithContext(
               Coder<AccumT> accumCoder,
               KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
@@ -137,7 +137,7 @@ public class StateSpecs {
    * only be used to initialize static values.
    */
   public static <InputT, AccumT, OutputT>
-      StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>>
           combiningValueFromInputInternal(
               Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     try {
@@ -154,13 +154,13 @@ public class StateSpecs {
   }
 
   private static <InputT, AccumT, OutputT>
-      StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
+      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
           Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new CombiningValueStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
 
   private static <K, InputT, AccumT, OutputT>
-      StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
+      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
           Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
     return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
@@ -219,7 +219,7 @@ public class StateSpecs {
 
   public static <K, InputT, AccumT, OutputT>
       StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
-          StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningSpec) {
+          StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
     if (combiningSpec instanceof KeyedCombiningValueStateSpec) {
       // Checked above; conversion to a bag spec depends on the provided spec being one of those
       // created via the factory methods in this class.
@@ -302,7 +302,7 @@ public class StateSpecs {
    */
   private static class CombiningValueStateSpec<InputT, AccumT, OutputT>
       extends KeyedCombiningValueStateSpec<Object, InputT, AccumT, OutputT>
-      implements StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+      implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
@@ -339,7 +339,7 @@ public class StateSpecs {
    * <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type.
    */
   private static class KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>
-      implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+      implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
@@ -353,7 +353,7 @@ public class StateSpecs {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
+    public CombiningState<InputT, AccumT, OutputT> bind(
         String id, StateBinder<? extends K> visitor) {
       return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn);
     }
@@ -410,7 +410,7 @@ public class StateSpecs {
    * <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type.
    */
   private static class KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>
-      implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+      implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
@@ -428,7 +428,7 @@ public class StateSpecs {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
+    public CombiningState<InputT, AccumT, OutputT> bind(
         String id, StateBinder<? extends K> visitor) {
       return visitor.bindKeyedCombiningValue(id, this, getAccumCoder(), keyedCombineFn);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index cc67ac2..d9b7b54 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -77,6 +77,7 @@ import org.apache.beam.sdk.testing.UsesTimersInParDo;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.OnTimer;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.Mean.CountSum;
 import org.apache.beam.sdk.transforms.ParDo.SingleOutput;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -92,8 +93,8 @@ import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.StateSpec;
@@ -2085,7 +2086,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<Object, SetState<Integer>> setState =
               StateSpecs.set(VarIntCoder.of());
           @StateId(countStateId)
-          private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2093,7 +2094,7 @@ public class ParDoTest implements Serializable {
           public void processElement(
               ProcessContext c,
               @StateId(stateId) SetState<Integer> state,
-              @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+              @StateId(countStateId) CombiningState<Integer, int[], Integer>
                   count) {
             state.add(c.element().getValue());
             count.add(1);
@@ -2129,7 +2130,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
 
           @StateId(countStateId)
-          private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2137,7 +2138,7 @@ public class ParDoTest implements Serializable {
           public void processElement(
               ProcessContext c,
               @StateId(stateId) SetState<MyInteger> state,
-              @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> count) {
+              @StateId(countStateId) CombiningState<Integer, int[], Integer> count) {
             state.add(new MyInteger(c.element().getValue()));
             count.add(1);
             if (count.read() >= 4) {
@@ -2172,7 +2173,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
 
           @StateId(countStateId)
-          private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2180,7 +2181,7 @@ public class ParDoTest implements Serializable {
           public void processElement(
               ProcessContext c,
               @StateId(stateId) SetState<MyInteger> state,
-              @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> count) {
+              @StateId(countStateId) CombiningState<Integer, int[], Integer> count) {
             state.add(new MyInteger(c.element().getValue()));
             count.add(1);
             if (count.read() >= 4) {
@@ -2214,14 +2215,14 @@ public class ParDoTest implements Serializable {
           private final StateSpec<Object, MapState<String, Integer>> mapState =
               StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
           @StateId(countStateId)
-          private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
           public void processElement(
               ProcessContext c, @StateId(stateId) MapState<String, Integer> state,
-              @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+              @StateId(countStateId) CombiningState<Integer, int[], Integer>
                   count) {
             KV<String, Integer> value = c.element().getValue();
             state.put(value.getKey(), value.getValue());
@@ -2260,14 +2261,14 @@ public class ParDoTest implements Serializable {
           @StateId(stateId)
           private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
           @StateId(countStateId)
-          private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
           public void processElement(
               ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state,
-              @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+              @StateId(countStateId) CombiningState<Integer, int[], Integer>
                   count) {
             KV<String, Integer> value = c.element().getValue();
             state.put(value.getKey(), new MyInteger(value.getValue()));
@@ -2306,14 +2307,14 @@ public class ParDoTest implements Serializable {
           @StateId(stateId)
           private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
           @StateId(countStateId)
-          private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
           public void processElement(
               ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state,
-              @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+              @StateId(countStateId) CombiningState<Integer, int[], Integer>
                   count) {
             KV<String, Integer> value = c.element().getValue();
             state.put(value.getKey(), new MyInteger(value.getValue()));
@@ -2351,7 +2352,7 @@ public class ParDoTest implements Serializable {
 
           @StateId(stateId)
           private final StateSpec<
-                  Object, AccumulatorCombiningState<Double, Mean.CountSum<Double>, Double>>
+                  Object, CombiningState<Double, CountSum<Double>, Double>>
               combiningState =
                   StateSpecs.combiningValue(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
 
@@ -2359,7 +2360,7 @@ public class ParDoTest implements Serializable {
           public void processElement(
               ProcessContext c,
               @StateId(stateId)
-                  AccumulatorCombiningState<Double, Mean.CountSum<Double>, Double> state) {
+                  CombiningState<Double, CountSum<Double>, Double> state) {
             state.add(c.element().getValue());
             Double currentValue = state.read();
             if (Math.abs(currentValue - 0.5) < EPSILON) {
@@ -2391,7 +2392,7 @@ public class ParDoTest implements Serializable {
 
           @StateId(stateId)
           private final StateSpec<
-              Object, AccumulatorCombiningState<Integer, MyInteger, Integer>>
+              Object, CombiningState<Integer, MyInteger, Integer>>
               combiningState =
               StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
                 @Override
@@ -2423,7 +2424,7 @@ public class ParDoTest implements Serializable {
           public void processElement(
               ProcessContext c,
               @StateId(stateId)
-                  AccumulatorCombiningState<Integer, MyInteger, Integer> state) {
+                  CombiningState<Integer, MyInteger, Integer> state) {
             state.add(c.element().getValue());
             Integer currentValue = state.read();
             if (currentValue == EXPECTED_SUM) {
@@ -2453,7 +2454,7 @@ public class ParDoTest implements Serializable {
 
           @StateId(stateId)
           private final StateSpec<
-              Object, AccumulatorCombiningState<Integer, MyInteger, Integer>>
+              Object, CombiningState<Integer, MyInteger, Integer>>
               combiningState =
               StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
                 @Override
@@ -2485,7 +2486,7 @@ public class ParDoTest implements Serializable {
           public void processElement(
               ProcessContext c,
               @StateId(stateId)
-                  AccumulatorCombiningState<Integer, MyInteger, Integer> state) {
+                  CombiningState<Integer, MyInteger, Integer> state) {
             state.add(c.element().getValue());
             Integer currentValue = state.read();
             if (currentValue == EXPECTED_SUM) {