You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/06 22:34:12 UTC

[1/6] beam git commit: Upgrade Dataflow worker image to beam-master-20170405

Repository: beam
Updated Branches:
  refs/heads/master bb1838476 -> b92032ff6


Upgrade Dataflow worker image to beam-master-20170405


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/359040e8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/359040e8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/359040e8

Branch: refs/heads/master
Commit: 359040e8f5151bde21752150030dd759766fc3e8
Parents: 33259d0
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 5 18:53:58 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 11:57:21 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/359040e8/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index fe531c7..2e3dc8a 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <dataflow.container_version>beam-master-20170329</dataflow.container_version>
+    <dataflow.container_version>beam-master-20170405</dataflow.container_version>
     <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
     <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
   </properties>


[4/6] beam git commit: Rename AccumulatorCombiningState to CombiningState

Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 095ca2a..26f1c98 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -29,8 +29,8 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -115,7 +115,7 @@ public class GroupIntoBatches<K, InputT>
     private final StateSpec<Object, BagState<InputT>> batchSpec;
 
     @StateId(NUM_ELEMENTS_IN_BATCH_ID)
-    private final StateSpec<Object, AccumulatorCombiningState<Long, Long, Long>>
+    private final StateSpec<Object, CombiningState<Long, Long, Long>>
         numElementsInBatchSpec;
 
     @StateId(KEY_ID)
@@ -171,7 +171,7 @@ public class GroupIntoBatches<K, InputT>
         @TimerId(END_OF_WINDOW_ID) Timer timer,
         @StateId(BATCH_ID) BagState<InputT> batch,
         @StateId(NUM_ELEMENTS_IN_BATCH_ID)
-            AccumulatorCombiningState<Long, Long, Long> numElementsInBatch,
+            CombiningState<Long, Long, Long> numElementsInBatch,
         @StateId(KEY_ID) ValueState<K> key,
         ProcessContext c,
         BoundedWindow window) {
@@ -203,7 +203,7 @@ public class GroupIntoBatches<K, InputT>
         @StateId(KEY_ID) ValueState<K> key,
         @StateId(BATCH_ID) BagState<InputT> batch,
         @StateId(NUM_ELEMENTS_IN_BATCH_ID)
-            AccumulatorCombiningState<Long, Long, Long> numElementsInBatch,
+            CombiningState<Long, Long, Long> numElementsInBatch,
         BoundedWindow window) {
       LOGGER.debug(
           "*** END OF WINDOW *** for timer timestamp {} in windows {}",
@@ -215,7 +215,7 @@ public class GroupIntoBatches<K, InputT>
         Context c,
         ValueState<K> key,
         BagState<InputT> batch,
-        AccumulatorCombiningState<Long, Long, Long> numElementsInBatch) {
+        CombiningState<Long, Long, Long> numElementsInBatch) {
       Iterable<InputT> values = batch.read();
       // when the timer fires, batch state might be empty
       if (Iterables.size(values) > 0) {

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
deleted file mode 100644
index 6b120f9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
- * to {@link GroupingState} that includes the {@code AccumT} type.
- *
- * @param <InputT> the type of values added to the state
- * @param <AccumT> the type of accumulator
- * @param <OutputT> the type of value extracted from the state
- */
-public interface AccumulatorCombiningState<InputT, AccumT, OutputT>
-    extends GroupingState<InputT, OutputT> {
-
-  /**
-   * Read the merged accumulator for this combining value. It is implied that reading the
-   * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
-   * this.
-   */
-  AccumT getAccum();
-
-  /**
-   * Add an accumulator to this combining value. Depending on implementation this may immediately
-   * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
-   */
-  void addAccum(AccumT accum);
-
-  /**
-   * Merge the given accumulators according to the underlying combiner.
-   */
-  AccumT mergeAccumulators(Iterable<AccumT> accumulators);
-
-  @Override
-  AccumulatorCombiningState<InputT, AccumT, OutputT> readLater();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
new file mode 100644
index 0000000..80e4dc9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
+ * to {@link GroupingState} that includes the {@code AccumT} type.
+ *
+ * @param <InputT> the type of values added to the state
+ * @param <AccumT> the type of accumulator
+ * @param <OutputT> the type of value extracted from the state
+ */
+public interface CombiningState<InputT, AccumT, OutputT>
+    extends GroupingState<InputT, OutputT> {
+
+  /**
+   * Read the merged accumulator for this combining value. It is implied that reading the
+   * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
+   * this.
+   */
+  AccumT getAccum();
+
+  /**
+   * Add an accumulator to this combining value. Depending on implementation this may immediately
+   * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
+   */
+  void addAccum(AccumT accum);
+
+  /**
+   * Merge the given accumulators according to the underlying combiner.
+   */
+  AccumT mergeAccumulators(Iterable<AccumT> accumulators);
+
+  @Override
+  CombiningState<InputT, AccumT, OutputT> readLater();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index fbfb475..98f7238 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -39,23 +39,23 @@ public interface StateBinder<K> {
       String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
 
-  <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+  <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
       String id,
-      StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+      StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
       Coder<AccumT> accumCoder,
       Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
 
   <InputT, AccumT, OutputT>
-      AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+  CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
           String id,
-          StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+          StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
           Coder<AccumT> accumCoder,
           Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
 
   <InputT, AccumT, OutputT>
-      AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+  CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
           String id,
-          StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+          StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
           Coder<AccumT> accumCoder,
           CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
               combineFn);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index db4b7de..974e11d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -62,7 +62,7 @@ public class StateSpecs {
    * {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-  StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue(
+  StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
       CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new CombiningValueStateSpec<InputT, AccumT, OutputT>(null, combineFn);
   }
@@ -72,7 +72,7 @@ public class StateSpecs {
    * {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-      StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue(
+      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
           Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     checkArgument(accumCoder != null,
         "accumCoder should not be null. "
@@ -85,7 +85,7 @@ public class StateSpecs {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-  StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+  StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
       KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
     return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
   }
@@ -95,7 +95,7 @@ public class StateSpecs {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-      StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
           Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
     checkArgument(accumCoder != null,
         "accumCoder should not be null. "
@@ -108,7 +108,7 @@ public class StateSpecs {
    * merge multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-  StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+  StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
   keyedCombiningValueWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
     return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
   }
@@ -118,7 +118,7 @@ public class StateSpecs {
    * merge multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-      StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+      StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
           keyedCombiningValueWithContext(
               Coder<AccumT> accumCoder,
               KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
@@ -137,7 +137,7 @@ public class StateSpecs {
    * only be used to initialize static values.
    */
   public static <InputT, AccumT, OutputT>
-      StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>>
           combiningValueFromInputInternal(
               Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     try {
@@ -154,13 +154,13 @@ public class StateSpecs {
   }
 
   private static <InputT, AccumT, OutputT>
-      StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
+      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
           Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new CombiningValueStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
 
   private static <K, InputT, AccumT, OutputT>
-      StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
+      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
           Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
     return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
@@ -219,7 +219,7 @@ public class StateSpecs {
 
   public static <K, InputT, AccumT, OutputT>
       StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
-          StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningSpec) {
+          StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
     if (combiningSpec instanceof KeyedCombiningValueStateSpec) {
       // Checked above; conversion to a bag spec depends on the provided spec being one of those
       // created via the factory methods in this class.
@@ -302,7 +302,7 @@ public class StateSpecs {
    */
   private static class CombiningValueStateSpec<InputT, AccumT, OutputT>
       extends KeyedCombiningValueStateSpec<Object, InputT, AccumT, OutputT>
-      implements StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+      implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
@@ -339,7 +339,7 @@ public class StateSpecs {
    * <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type.
    */
   private static class KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>
-      implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+      implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
@@ -353,7 +353,7 @@ public class StateSpecs {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
+    public CombiningState<InputT, AccumT, OutputT> bind(
         String id, StateBinder<? extends K> visitor) {
       return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn);
     }
@@ -410,7 +410,7 @@ public class StateSpecs {
    * <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type.
    */
   private static class KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>
-      implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+      implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
@@ -428,7 +428,7 @@ public class StateSpecs {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
+    public CombiningState<InputT, AccumT, OutputT> bind(
         String id, StateBinder<? extends K> visitor) {
       return visitor.bindKeyedCombiningValue(id, this, getAccumCoder(), keyedCombineFn);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index cc67ac2..d9b7b54 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -77,6 +77,7 @@ import org.apache.beam.sdk.testing.UsesTimersInParDo;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.OnTimer;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.Mean.CountSum;
 import org.apache.beam.sdk.transforms.ParDo.SingleOutput;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -92,8 +93,8 @@ import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.StateSpec;
@@ -2085,7 +2086,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<Object, SetState<Integer>> setState =
               StateSpecs.set(VarIntCoder.of());
           @StateId(countStateId)
-          private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2093,7 +2094,7 @@ public class ParDoTest implements Serializable {
           public void processElement(
               ProcessContext c,
               @StateId(stateId) SetState<Integer> state,
-              @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+              @StateId(countStateId) CombiningState<Integer, int[], Integer>
                   count) {
             state.add(c.element().getValue());
             count.add(1);
@@ -2129,7 +2130,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
 
           @StateId(countStateId)
-          private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2137,7 +2138,7 @@ public class ParDoTest implements Serializable {
           public void processElement(
               ProcessContext c,
               @StateId(stateId) SetState<MyInteger> state,
-              @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> count) {
+              @StateId(countStateId) CombiningState<Integer, int[], Integer> count) {
             state.add(new MyInteger(c.element().getValue()));
             count.add(1);
             if (count.read() >= 4) {
@@ -2172,7 +2173,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
 
           @StateId(countStateId)
-          private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
@@ -2180,7 +2181,7 @@ public class ParDoTest implements Serializable {
           public void processElement(
               ProcessContext c,
               @StateId(stateId) SetState<MyInteger> state,
-              @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> count) {
+              @StateId(countStateId) CombiningState<Integer, int[], Integer> count) {
             state.add(new MyInteger(c.element().getValue()));
             count.add(1);
             if (count.read() >= 4) {
@@ -2214,14 +2215,14 @@ public class ParDoTest implements Serializable {
           private final StateSpec<Object, MapState<String, Integer>> mapState =
               StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
           @StateId(countStateId)
-          private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
           public void processElement(
               ProcessContext c, @StateId(stateId) MapState<String, Integer> state,
-              @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+              @StateId(countStateId) CombiningState<Integer, int[], Integer>
                   count) {
             KV<String, Integer> value = c.element().getValue();
             state.put(value.getKey(), value.getValue());
@@ -2260,14 +2261,14 @@ public class ParDoTest implements Serializable {
           @StateId(stateId)
           private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
           @StateId(countStateId)
-          private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
           public void processElement(
               ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state,
-              @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+              @StateId(countStateId) CombiningState<Integer, int[], Integer>
                   count) {
             KV<String, Integer> value = c.element().getValue();
             state.put(value.getKey(), new MyInteger(value.getValue()));
@@ -2306,14 +2307,14 @@ public class ParDoTest implements Serializable {
           @StateId(stateId)
           private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
           @StateId(countStateId)
-          private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+          private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
               countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
           public void processElement(
               ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state,
-              @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+              @StateId(countStateId) CombiningState<Integer, int[], Integer>
                   count) {
             KV<String, Integer> value = c.element().getValue();
             state.put(value.getKey(), new MyInteger(value.getValue()));
@@ -2351,7 +2352,7 @@ public class ParDoTest implements Serializable {
 
           @StateId(stateId)
           private final StateSpec<
-                  Object, AccumulatorCombiningState<Double, Mean.CountSum<Double>, Double>>
+                  Object, CombiningState<Double, CountSum<Double>, Double>>
               combiningState =
                   StateSpecs.combiningValue(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
 
@@ -2359,7 +2360,7 @@ public class ParDoTest implements Serializable {
           public void processElement(
               ProcessContext c,
               @StateId(stateId)
-                  AccumulatorCombiningState<Double, Mean.CountSum<Double>, Double> state) {
+                  CombiningState<Double, CountSum<Double>, Double> state) {
             state.add(c.element().getValue());
             Double currentValue = state.read();
             if (Math.abs(currentValue - 0.5) < EPSILON) {
@@ -2391,7 +2392,7 @@ public class ParDoTest implements Serializable {
 
           @StateId(stateId)
           private final StateSpec<
-              Object, AccumulatorCombiningState<Integer, MyInteger, Integer>>
+              Object, CombiningState<Integer, MyInteger, Integer>>
               combiningState =
               StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
                 @Override
@@ -2423,7 +2424,7 @@ public class ParDoTest implements Serializable {
           public void processElement(
               ProcessContext c,
               @StateId(stateId)
-                  AccumulatorCombiningState<Integer, MyInteger, Integer> state) {
+                  CombiningState<Integer, MyInteger, Integer> state) {
             state.add(c.element().getValue());
             Integer currentValue = state.read();
             if (currentValue == EXPECTED_SUM) {
@@ -2453,7 +2454,7 @@ public class ParDoTest implements Serializable {
 
           @StateId(stateId)
           private final StateSpec<
-              Object, AccumulatorCombiningState<Integer, MyInteger, Integer>>
+              Object, CombiningState<Integer, MyInteger, Integer>>
               combiningState =
               StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
                 @Override
@@ -2485,7 +2486,7 @@ public class ParDoTest implements Serializable {
           public void processElement(
               ProcessContext c,
               @StateId(stateId)
-                  AccumulatorCombiningState<Integer, MyInteger, Integer> state) {
+                  CombiningState<Integer, MyInteger, Integer> state) {
             state.add(c.element().getValue());
             Integer currentValue = state.read();
             if (currentValue == EXPECTED_SUM) {


[2/6] beam git commit: Rename combiningValue to combining in StateSpecs

Posted by ke...@apache.org.
Rename combiningValue to combining in StateSpecs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33259d05
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33259d05
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33259d05

Branch: refs/heads/master
Commit: 33259d05e7ddfca7d27974394ea9e94b2b4b985c
Parents: ef480a3
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 3 11:37:50 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 11:57:21 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/core/StateTags.java | 14 +--
 .../src/main/resources/beam/findbugs-filter.xml |  2 +-
 .../beam/sdk/transforms/GroupIntoBatches.java   |  2 +-
 .../apache/beam/sdk/util/state/StateBinder.java |  6 +-
 .../apache/beam/sdk/util/state/StateSpecs.java  | 90 ++++++++++----------
 .../apache/beam/sdk/transforms/ParDoTest.java   | 18 ++--
 6 files changed, 66 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 4893919..77ae8f5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -84,7 +84,7 @@ public class StateTags {
 
       @Override
       public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+      CombiningState<InputT, AccumT, OutputT> bindCombining(
               String id,
               StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
@@ -94,7 +94,7 @@ public class StateTags {
 
       @Override
       public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombining(
               String id,
               StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
@@ -104,7 +104,7 @@ public class StateTags {
 
       @Override
       public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext(
               String id,
               StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
@@ -162,7 +162,7 @@ public class StateTags {
     combiningValue(
       String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.combiningValue(accumCoder, combineFn));
+        new StructuredId(id), StateSpecs.combining(accumCoder, combineFn));
   }
 
   /**
@@ -174,7 +174,7 @@ public class StateTags {
       keyedCombiningValue(String id, Coder<AccumT> accumCoder,
           KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.keyedCombiningValue(accumCoder, combineFn));
+        new StructuredId(id), StateSpecs.keyedCombining(accumCoder, combineFn));
   }
 
   /**
@@ -188,7 +188,7 @@ public class StateTags {
           Coder<AccumT> accumCoder,
           KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.keyedCombiningValueWithContext(accumCoder, combineFn));
+        new StructuredId(id), StateSpecs.keyedCombiningWithContext(accumCoder, combineFn));
   }
 
   /**
@@ -203,7 +203,7 @@ public class StateTags {
       combiningValueFromInputInternal(
           String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.combiningValueFromInputInternal(inputCoder, combineFn));
+        new StructuredId(id), StateSpecs.combiningFromInputInternal(inputCoder, combineFn));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 2799b00..1c0f301 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -380,7 +380,7 @@
     <!--[BEAM-420] Non-transient non-serializable instance field in serializable class-->
   </Match>
   <Match>
-    <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningValueStateSpec"/>
+    <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningStateSpec"/>
     <Method name="equals"/>
     <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
     <!--[BEAM-421] Class doesn't override equals in superclass-->

http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 26f1c98..2462b1c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -132,7 +132,7 @@ public class GroupIntoBatches<K, InputT>
       this.allowedLateness = allowedLateness;
       this.batchSpec = StateSpecs.bag(inputValueCoder);
       this.numElementsInBatchSpec =
-          StateSpecs.combiningValue(
+          StateSpecs.combining(
               VarLongCoder.of(),
               new Combine.CombineFn<Long, Long, Long>() {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index 98f7238..64841fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -39,21 +39,21 @@ public interface StateBinder<K> {
       String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
 
-  <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+  <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(
       String id,
       StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
       Coder<AccumT> accumCoder,
       Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
 
   <InputT, AccumT, OutputT>
-  CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+  CombiningState<InputT, AccumT, OutputT> bindKeyedCombining(
           String id,
           StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
           Coder<AccumT> accumCoder,
           Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
 
   <InputT, AccumT, OutputT>
-  CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+  CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext(
           String id,
           StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
           Coder<AccumT> accumCoder,

http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index 974e11d..30a7a6d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -62,9 +62,9 @@ public class StateSpecs {
    * {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-  StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
+  StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
       CombineFn<InputT, AccumT, OutputT> combineFn) {
-    return new CombiningValueStateSpec<InputT, AccumT, OutputT>(null, combineFn);
+    return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn);
   }
 
   /**
@@ -72,12 +72,12 @@ public class StateSpecs {
    * {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
+      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
           Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     checkArgument(accumCoder != null,
         "accumCoder should not be null. "
-            + "Consider using combiningValue(CombineFn<> combineFn) instead.");
-    return combiningValueInternal(accumCoder, combineFn);
+            + "Consider using combining(CombineFn<> combineFn) instead.");
+    return combiningInternal(accumCoder, combineFn);
   }
 
   /**
@@ -85,9 +85,9 @@ public class StateSpecs {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-  StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+  StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining(
       KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
+    return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
   }
 
   /**
@@ -95,12 +95,12 @@ public class StateSpecs {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining(
           Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
     checkArgument(accumCoder != null,
         "accumCoder should not be null. "
-            + "Consider using keyedCombiningValue(KeyedCombineFn<> combineFn) instead.");
-    return keyedCombiningValueInternal(accumCoder, combineFn);
+            + "Consider using keyedCombining(KeyedCombineFn<> combineFn) instead.");
+    return keyedCombiningInternal(accumCoder, combineFn);
   }
 
   /**
@@ -109,8 +109,8 @@ public class StateSpecs {
    */
   public static <K, InputT, AccumT, OutputT>
   StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
-  keyedCombiningValueWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
-    return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
+  keyedCombiningWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
+    return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
   }
 
   /**
@@ -119,13 +119,13 @@ public class StateSpecs {
    */
   public static <K, InputT, AccumT, OutputT>
       StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
-          keyedCombiningValueWithContext(
+  keyedCombiningWithContext(
               Coder<AccumT> accumCoder,
               KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
     checkArgument(accumCoder != null,
         "accumCoder should not be null. Consider using "
-            + "keyedCombiningValueWithContext(KeyedCombineFnWithContext<> combineFn) instead.");
-    return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(
+            + "keyedCombiningWithContext(KeyedCombineFnWithContext<> combineFn) instead.");
+    return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(
         accumCoder, combineFn);
   }
 
@@ -138,11 +138,11 @@ public class StateSpecs {
    */
   public static <InputT, AccumT, OutputT>
       StateSpec<Object, CombiningState<InputT, AccumT, OutputT>>
-          combiningValueFromInputInternal(
+  combiningFromInputInternal(
               Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     try {
       Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
-      return combiningValueInternal(accumCoder, combineFn);
+      return combiningInternal(accumCoder, combineFn);
     } catch (CannotProvideCoderException e) {
       throw new IllegalArgumentException(
           "Unable to determine accumulator coder for "
@@ -154,15 +154,15 @@ public class StateSpecs {
   }
 
   private static <InputT, AccumT, OutputT>
-      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
+      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
           Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-    return new CombiningValueStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
+    return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
 
   private static <K, InputT, AccumT, OutputT>
-      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
+      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningInternal(
           Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
+    return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
 
   /**
@@ -220,17 +220,17 @@ public class StateSpecs {
   public static <K, InputT, AccumT, OutputT>
       StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
           StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
-    if (combiningSpec instanceof KeyedCombiningValueStateSpec) {
+    if (combiningSpec instanceof KeyedCombiningStateSpec) {
       // Checked above; conversion to a bag spec depends on the provided spec being one of those
       // created via the factory methods in this class.
       @SuppressWarnings("unchecked")
-      KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT> typedSpec =
-          (KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+      KeyedCombiningStateSpec<K, InputT, AccumT, OutputT> typedSpec =
+          (KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
       return typedSpec.asBagSpec();
-    } else if (combiningSpec instanceof KeyedCombiningValueWithContextStateSpec) {
+    } else if (combiningSpec instanceof KeyedCombiningWithContextStateSpec) {
       @SuppressWarnings("unchecked")
-      KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec =
-          (KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+      KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec =
+          (KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
       return typedSpec.asBagSpec();
     } else {
       throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec);
@@ -300,15 +300,15 @@ public class StateSpecs {
    *
    * <p>Includes the {@link CombineFn} and the coder for the accumulator type.
    */
-  private static class CombiningValueStateSpec<InputT, AccumT, OutputT>
-      extends KeyedCombiningValueStateSpec<Object, InputT, AccumT, OutputT>
+  private static class CombiningStateSpec<InputT, AccumT, OutputT>
+      extends KeyedCombiningStateSpec<Object, InputT, AccumT, OutputT>
       implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
     private final CombineFn<InputT, AccumT, OutputT> combineFn;
 
-    private CombiningValueStateSpec(
+    private CombiningStateSpec(
         @Nullable Coder<AccumT> accumCoder,
         CombineFn<InputT, AccumT, OutputT> combineFn) {
       super(accumCoder, combineFn.asKeyedFn());
@@ -338,14 +338,14 @@ public class StateSpecs {
    *
    * <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type.
    */
-  private static class KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>
+  private static class KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>
       implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
     private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
 
-    protected KeyedCombiningValueWithContextStateSpec(
+    protected KeyedCombiningWithContextStateSpec(
         @Nullable Coder<AccumT> accumCoder,
         KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
       this.combineFn = combineFn;
@@ -355,7 +355,7 @@ public class StateSpecs {
     @Override
     public CombiningState<InputT, AccumT, OutputT> bind(
         String id, StateBinder<? extends K> visitor) {
-      return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn);
+      return visitor.bindKeyedCombiningWithContext(id, this, accumCoder, combineFn);
     }
 
     @SuppressWarnings("unchecked")
@@ -371,9 +371,9 @@ public class StateSpecs {
     @Override public void finishSpecifying() {
       if (accumCoder == null) {
         throw new IllegalStateException("Unable to infer a coder for"
-            + " KeyedCombiningValueWithContextState and no Coder was specified."
+            + " KeyedCombiningWithContextState and no Coder was specified."
             + " Please set a coder by either invoking"
-            + " StateSpecs.keyedCombiningValue(Coder<AccumT> accumCoder,"
+            + " StateSpecs.keyedCombining(Coder<AccumT> accumCoder,"
             + " KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn)"
             + " or by registering the coder in the Pipeline's CoderRegistry.");
       }
@@ -385,12 +385,12 @@ public class StateSpecs {
         return true;
       }
 
-      if (!(obj instanceof KeyedCombiningValueWithContextStateSpec)) {
+      if (!(obj instanceof KeyedCombiningWithContextStateSpec)) {
         return false;
       }
 
-      KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?> that =
-          (KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?>) obj;
+      KeyedCombiningWithContextStateSpec<?, ?, ?, ?> that =
+          (KeyedCombiningWithContextStateSpec<?, ?, ?, ?>) obj;
       return Objects.equals(this.accumCoder, that.accumCoder);
     }
 
@@ -409,14 +409,14 @@ public class StateSpecs {
    *
    * <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type.
    */
-  private static class KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>
+  private static class KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>
       implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
     private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
 
-    protected KeyedCombiningValueStateSpec(
+    protected KeyedCombiningStateSpec(
         @Nullable Coder<AccumT> accumCoder,
         KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
       this.keyedCombineFn = keyedCombineFn;
@@ -430,7 +430,7 @@ public class StateSpecs {
     @Override
     public CombiningState<InputT, AccumT, OutputT> bind(
         String id, StateBinder<? extends K> visitor) {
-      return visitor.bindKeyedCombiningValue(id, this, getAccumCoder(), keyedCombineFn);
+      return visitor.bindKeyedCombining(id, this, getAccumCoder(), keyedCombineFn);
     }
 
     @SuppressWarnings("unchecked")
@@ -447,7 +447,7 @@ public class StateSpecs {
       if (getAccumCoder() == null) {
         throw new IllegalStateException("Unable to infer a coder for GroupingState and no"
             + " Coder was specified. Please set a coder by either invoking"
-            + " StateSpecs.combiningValue(Coder<AccumT> accumCoder,"
+            + " StateSpecs.combining(Coder<AccumT> accumCoder,"
             + " CombineFn<InputT, AccumT, OutputT> combineFn)"
             + " or by registering the coder in the Pipeline's CoderRegistry.");
       }
@@ -459,12 +459,12 @@ public class StateSpecs {
         return true;
       }
 
-      if (!(obj instanceof CombiningValueStateSpec)) {
+      if (!(obj instanceof CombiningStateSpec)) {
         return false;
       }
 
-      KeyedCombiningValueStateSpec<?, ?, ?, ?> that =
-          (KeyedCombiningValueStateSpec<?, ?, ?, ?>) obj;
+      KeyedCombiningStateSpec<?, ?, ?, ?> that =
+          (KeyedCombiningStateSpec<?, ?, ?, ?>) obj;
       return Objects.equals(this.accumCoder, that.accumCoder);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d9b7b54..e305da1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2087,7 +2087,7 @@ public class ParDoTest implements Serializable {
               StateSpecs.set(VarIntCoder.of());
           @StateId(countStateId)
           private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
-              countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+              countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
@@ -2131,7 +2131,7 @@ public class ParDoTest implements Serializable {
 
           @StateId(countStateId)
           private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
-              countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+              countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
@@ -2174,7 +2174,7 @@ public class ParDoTest implements Serializable {
 
           @StateId(countStateId)
           private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
-              countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+              countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
@@ -2216,7 +2216,7 @@ public class ParDoTest implements Serializable {
               StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
           @StateId(countStateId)
           private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
-              countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+              countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
@@ -2262,7 +2262,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
           @StateId(countStateId)
           private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
-              countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+              countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
@@ -2308,7 +2308,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
           @StateId(countStateId)
           private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
-              countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+              countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
@@ -2354,7 +2354,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<
                   Object, CombiningState<Double, CountSum<Double>, Double>>
               combiningState =
-                  StateSpecs.combiningValue(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
+                  StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
 
           @ProcessElement
           public void processElement(
@@ -2394,7 +2394,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<
               Object, CombiningState<Integer, MyInteger, Integer>>
               combiningState =
-              StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
+              StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
                 @Override
                 public MyInteger createAccumulator() {
                   return new MyInteger(0);
@@ -2456,7 +2456,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<
               Object, CombiningState<Integer, MyInteger, Integer>>
               combiningState =
-              StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
+              StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
                 @Override
                 public MyInteger createAccumulator() {
                   return new MyInteger(0);


[5/6] beam git commit: Rename AccumulatorCombiningState to CombiningState

Posted by ke...@apache.org.
Rename AccumulatorCombiningState to CombiningState


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef480a37
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef480a37
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef480a37

Branch: refs/heads/master
Commit: ef480a37ebe039d0eaa2d4ca758ea015893e9089
Parents: 24c0495
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 3 11:27:26 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 11:57:21 2017 -0700

----------------------------------------------------------------------
 .../translation/utils/ApexStateInternals.java   | 28 ++++----
 .../utils/ApexStateInternalsTest.java           | 14 ++--
 .../runners/core/InMemoryStateInternals.java    | 36 +++++------
 .../apache/beam/runners/core/NonEmptyPanes.java |  4 +-
 .../beam/runners/core/SideInputHandler.java     | 18 +++---
 .../apache/beam/runners/core/StateMerging.java  | 14 ++--
 .../org/apache/beam/runners/core/StateTag.java  | 14 ++--
 .../org/apache/beam/runners/core/StateTags.java | 24 +++----
 .../beam/runners/core/SystemReduceFn.java       |  4 +-
 .../AfterDelayFromFirstElementStateMachine.java |  8 +--
 .../core/triggers/AfterPaneStateMachine.java    |  4 +-
 .../core/InMemoryStateInternalsTest.java        | 14 ++--
 .../CopyOnAccessInMemoryStateInternals.java     | 46 +++++++------
 .../CopyOnAccessInMemoryStateInternalsTest.java |  6 +-
 .../state/FlinkBroadcastStateInternals.java     | 68 ++++++++++----------
 .../state/FlinkKeyGroupStateInternals.java      | 16 ++---
 .../state/FlinkSplitStateInternals.java         | 16 ++---
 .../streaming/state/FlinkStateInternals.java    | 68 ++++++++++----------
 .../FlinkBroadcastStateInternalsTest.java       | 14 ++--
 .../streaming/FlinkStateInternalsTest.java      | 14 ++--
 .../spark/stateful/SparkStateInternals.java     | 30 ++++-----
 .../beam/sdk/transforms/GroupIntoBatches.java   | 10 +--
 .../util/state/AccumulatorCombiningState.java   | 53 ---------------
 .../beam/sdk/util/state/CombiningState.java     | 53 +++++++++++++++
 .../apache/beam/sdk/util/state/StateBinder.java | 12 ++--
 .../apache/beam/sdk/util/state/StateSpecs.java  | 30 ++++-----
 .../apache/beam/sdk/transforms/ParDoTest.java   | 39 +++++------
 27 files changed, 328 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index 7634366..c59afc5 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -43,8 +43,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -139,12 +139,12 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             final CombineFn<InputT, AccumT, OutputT> combineFn) {
-      return new ApexAccumulatorCombiningState<>(
+      return new ApexCombiningState<>(
           namespace,
           address,
           accumCoder,
@@ -161,12 +161,12 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindKeyedCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      return new ApexAccumulatorCombiningState<>(
+      return new ApexCombiningState<>(
           namespace,
           address,
           accumCoder,
@@ -174,9 +174,9 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindKeyedCombiningValueWithContext(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
       return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
@@ -323,14 +323,14 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
 
   }
 
-  private final class ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+  private final class ApexCombiningState<K, InputT, AccumT, OutputT>
       extends AbstractState<AccumT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+      implements CombiningState<InputT, AccumT, OutputT> {
     private final K key;
     private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
 
-    private ApexAccumulatorCombiningState(StateNamespace namespace,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+    private ApexCombiningState(StateNamespace namespace,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         Coder<AccumT> coder,
         K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
       super(namespace, address, coder);
@@ -339,7 +339,7 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
     }
 
     @Override
-    public ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() {
+    public ApexCombiningState<K, InputT, AccumT, OutputT> readLater() {
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index a1494ad..4f4ecfb 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -58,7 +58,7 @@ public class ApexStateInternalsTest {
 
   private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
@@ -181,9 +181,9 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoSource() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
 
     value1.add(5);
@@ -202,11 +202,11 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value3 =
+    CombiningState<Integer, int[], Integer> value3 =
         underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
 
     value1.add(5);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index b4b2b38..0d5b058 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -38,8 +38,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -148,12 +148,12 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             final CombineFn<InputT, AccumT, OutputT> combineFn) {
-      return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn());
+      return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn());
     }
 
     @Override
@@ -164,18 +164,18 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindKeyedCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn);
+      return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn);
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindKeyedCombiningValueWithContext(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
       return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
@@ -307,17 +307,17 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
   }
 
   /**
-   * An {@link InMemoryState} implementation of {@link AccumulatorCombiningState}.
+   * An {@link InMemoryState} implementation of {@link CombiningState}.
    */
-  public static final class InMemoryCombiningValue<K, InputT, AccumT, OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT>,
-          InMemoryState<InMemoryCombiningValue<K, InputT, AccumT, OutputT>> {
+  public static final class InMemoryCombiningState<K, InputT, AccumT, OutputT>
+      implements CombiningState<InputT, AccumT, OutputT>,
+          InMemoryState<InMemoryCombiningState<K, InputT, AccumT, OutputT>> {
     private final K key;
     private boolean isCleared = true;
     private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
     private AccumT accum;
 
-    public InMemoryCombiningValue(
+    public InMemoryCombiningState(
         K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
       this.key = key;
       this.combineFn = combineFn;
@@ -325,7 +325,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public InMemoryCombiningValue<K, InputT, AccumT, OutputT> readLater() {
+    public InMemoryCombiningState<K, InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -384,9 +384,9 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public InMemoryCombiningValue<K, InputT, AccumT, OutputT> copy() {
-      InMemoryCombiningValue<K, InputT, AccumT, OutputT> that =
-          new InMemoryCombiningValue<>(key, combineFn);
+    public InMemoryCombiningState<K, InputT, AccumT, OutputT> copy() {
+      InMemoryCombiningState<K, InputT, AccumT, OutputT> that =
+          new InMemoryCombiningState<>(key, combineFn);
       if (!this.isCleared) {
         that.isCleared = this.isCleared;
         that.addAccum(accum);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
index aa033ce..3e875c2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.ReadableState;
 
 /**
@@ -113,7 +113,7 @@ public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
   private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
       extends NonEmptyPanes<K, W> {
 
-    private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
+    private static final StateTag<Object, CombiningState<Long, long[], Long>>
         PANE_ADDITIONS_TAG =
         StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
             "count", VarLongCoder.of(), Sum.ofLongs()));

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 24f326d..26e920a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.PCollectionView;
 
@@ -71,10 +71,10 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
       PCollectionView<?>,
       StateTag<
           Object,
-          AccumulatorCombiningState<
-              BoundedWindow,
-              Set<BoundedWindow>,
-              Set<BoundedWindow>>>> availableWindowsTags;
+          CombiningState<
+                        BoundedWindow,
+                        Set<BoundedWindow>,
+                        Set<BoundedWindow>>>> availableWindowsTags;
 
   /**
    * State tag for the actual contents of each side input per window.
@@ -106,10 +106,10 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
 
       StateTag<
           Object,
-          AccumulatorCombiningState<
-              BoundedWindow,
-              Set<BoundedWindow>,
-              Set<BoundedWindow>>> availableTag = StateTags.combiningValue(
+          CombiningState<
+                        BoundedWindow,
+                        Set<BoundedWindow>,
+                        Set<BoundedWindow>>> availableTag = StateTags.combiningValue(
           "side-input-available-windows-" + sideInput.getTagInternal().getId(),
           SetCoder.of(windowCoder),
           new WindowSetCombineFn());

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index 593d697..3410850 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -24,8 +24,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -172,7 +172,7 @@ public class StateMerging {
    */
   public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(
       MergingStateAccessor<K, W> context,
-      StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address) {
+      StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address) {
     mergeCombiningValues(
         context.accessInEachMergingWindow(address).values(), context.access(address));
   }
@@ -182,8 +182,8 @@ public class StateMerging {
    * {@code result}.
    */
   public static <InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(
-      Collection<AccumulatorCombiningState<InputT, AccumT, OutputT>> sources,
-      AccumulatorCombiningState<InputT, AccumT, OutputT> result) {
+      Collection<CombiningState<InputT, AccumT, OutputT>> sources,
+      CombiningState<InputT, AccumT, OutputT> result) {
     if (sources.isEmpty()) {
       // Nothing to merge.
       return;
@@ -194,18 +194,18 @@ public class StateMerging {
     }
     // Prefetch.
     List<ReadableState<AccumT>> futures = new ArrayList<>(sources.size());
-    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
+    for (CombiningState<InputT, AccumT, OutputT> source : sources) {
       prefetchRead(source);
     }
     // Read.
     List<AccumT> accumulators = new ArrayList<>(futures.size());
-    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
+    for (CombiningState<InputT, AccumT, OutputT> source : sources) {
       accumulators.add(source.getAccum());
     }
     // Merge (possibly update and return one of the existing accumulators).
     AccumT merged = result.mergeAccumulators(accumulators);
     // Clear sources.
-    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
+    for (CombiningState<InputT, AccumT, OutputT> source : sources) {
       source.clear();
     }
     // Update result.

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index 802aede..12c59ad 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -28,8 +28,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.State;
@@ -94,20 +94,20 @@ public interface StateTag<K, StateT extends State> extends Serializable {
         StateTag<? super K, MapState<KeyT, ValueT>> spec,
         Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
 
-    <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+    <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
         Coder<AccumT> accumCoder,
         CombineFn<InputT, AccumT, OutputT> combineFn);
 
     <InputT, AccumT, OutputT>
-    AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+    CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
         Coder<AccumT> accumCoder,
         KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
 
     <InputT, AccumT, OutputT>
-    AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+    CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
         Coder<AccumT> accumCoder,
         KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
             combineFn);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 1c70dff..4893919 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -30,8 +30,8 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.State;
@@ -84,9 +84,9 @@ public class StateTags {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+      CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
               String id,
-              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+              StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
               CombineFn<InputT, AccumT, OutputT> combineFn) {
         return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, combineFn);
@@ -94,9 +94,9 @@ public class StateTags {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
               String id,
-              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+              StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
               KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
         return binder.bindKeyedCombiningValue(tagForSpec(id, spec), accumCoder, combineFn);
@@ -104,9 +104,9 @@ public class StateTags {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
               String id,
-              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+              StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
               KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
         return binder.bindKeyedCombiningValueWithContext(
@@ -158,7 +158,7 @@ public class StateTags {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-    StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+    StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
     combiningValue(
       String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
@@ -170,7 +170,7 @@ public class StateTags {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT,
-      OutputT> StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+      OutputT> StateTag<K, CombiningState<InputT, AccumT, OutputT>>
       keyedCombiningValue(String id, Coder<AccumT> accumCoder,
           KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
@@ -182,7 +182,7 @@ public class StateTags {
    * merge multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-      StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+      StateTag<K, CombiningState<InputT, AccumT, OutputT>>
       keyedCombiningValueWithContext(
           String id,
           Coder<AccumT> accumCoder,
@@ -199,7 +199,7 @@ public class StateTags {
    * should only be used to initialize static values.
    */
   public static <InputT, AccumT, OutputT>
-      StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+      StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
       combiningValueFromInputInternal(
           String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
@@ -255,7 +255,7 @@ public class StateTags {
 
   public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>>
       convertToBagTagInternal(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningTag) {
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> combiningTag) {
     return new SimpleStateTag<>(
         new StructuredId(combiningTag.getId()),
         StateSpecs.convertToBagSpecInternal(combiningTag.getSpec()));

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index 0f2f790..f618d88 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -25,8 +25,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 
@@ -71,7 +71,7 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
       AccumT, OutputT, W>
       combining(
           final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    final StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> bufferTag;
+    final StateTag<K, CombiningState<InputT, AccumT, OutputT>> bufferTag;
     if (combineFn.getFn() instanceof KeyedCombineFnWithContext) {
       bufferTag = StateTags.makeSystemTagInternal(
           StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext(

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 29c29a7..b416788 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -30,11 +30,11 @@ import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.Holder;
 import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -55,8 +55,8 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
   protected static final List<SerializableFunction<Instant, Instant>> IDENTITY =
       ImmutableList.<SerializableFunction<Instant, Instant>>of();
 
-  protected static final StateTag<Object, AccumulatorCombiningState<Instant,
-                                              Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
+  protected static final StateTag<Object, CombiningState<Instant,
+                                                Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
       StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
           "delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index 1dd5b65..11323cc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -27,7 +27,7 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStat
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
 
 /**
  * {@link TriggerStateMachine}s that fire based on properties of the elements in the current pane.
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 @Experimental(Experimental.Kind.TRIGGER)
 public class AfterPaneStateMachine extends OnceTriggerStateMachine {
 
-private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
+private static final StateTag<Object, CombiningState<Long, long[], Long>>
       ELEMENTS_IN_PANE_TAG =
       StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
           "count", VarLongCoder.of(), Sum.ofLongs()));

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 5f90084..e4fb5c1 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
@@ -61,7 +61,7 @@ public class InMemoryStateInternalsTest {
 
   private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
@@ -411,9 +411,9 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoSource() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
 
     value1.add(5);
@@ -432,11 +432,11 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value3 =
+    CombiningState<Integer, int[], Integer> value3 =
         underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
 
     value1.add(5);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index ff5c23c..0665812 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -26,7 +26,7 @@ import java.util.HashSet;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryBag;
-import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryCombiningValue;
+import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryCombiningState;
 import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryMap;
 import org.apache.beam.runners.core.InMemoryStateInternals.InMemorySet;
 import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryState;
@@ -45,8 +45,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.State;
@@ -306,19 +306,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
           }
 
           @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
               bindCombiningValue(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
-              InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, OutputT>>
-                  existingState = (
-                  InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT,
-                                            OutputT>>) underlying.get().get(namespace, address, c);
+              InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState =
+                  (InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>>)
+                      underlying.get().get(namespace, address, c);
               return existingState.copy();
             } else {
-              return new InMemoryCombiningValue<>(
+              return new InMemoryCombiningState<>(
                   key, combineFn.asKeyedFn());
             }
           }
@@ -367,27 +366,26 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
           }
 
           @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
               bindKeyedCombiningValue(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
-              InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, OutputT>>
-                  existingState = (
-                  InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT,
-                                            OutputT>>) underlying.get().get(namespace, address, c);
+              InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState =
+                  (InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>>)
+                      underlying.get().get(namespace, address, c);
               return existingState.copy();
             } else {
-              return new InMemoryCombiningValue<>(key, combineFn);
+              return new InMemoryCombiningState<>(key, combineFn);
             }
           }
 
           @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
           bindKeyedCombiningValueWithContext(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
             return bindKeyedCombiningValue(
@@ -449,9 +447,9 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
           }
 
           @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
               bindCombiningValue(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
             return underlying.get(namespace, address, c);
           }
@@ -476,18 +474,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
           }
 
           @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
               bindKeyedCombiningValue(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
             return underlying.get(namespace, address, c);
           }
 
           @Override
-          public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
           bindKeyedCombiningValueWithContext(
-                  StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
                   KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
             return bindKeyedCombiningValue(

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 59c0a37..142af32 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -45,8 +45,8 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -229,7 +229,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
     CoderRegistry reg = pipeline.getCoderRegistry();
-    StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag =
+    StateTag<Object, CombiningState<Long, long[], Long>> stateTag =
         StateTags.combiningValue("summer",
             sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
     GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
@@ -259,7 +259,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
     StateNamespace namespace = new StateNamespaceForTest("foo");
     CoderRegistry reg = pipeline.getCoderRegistry();
-    StateTag<String, AccumulatorCombiningState<Long, long[], Long>> stateTag =
+    StateTag<String, CombiningState<Long, long[], Long>> stateTag =
         StateTags.keyedCombiningValue(
             "summer",
             sumLongFn.getAccumulatorCoder(

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index bcc3660..3203446 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -37,8 +37,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -133,23 +133,23 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT>
       bindCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
 
-        return new FlinkAccumulatorCombiningState<>(
+        return new FlinkCombiningState<>(
             stateBackend, address, combineFn, namespace, accumCoder);
       }
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-        return new FlinkKeyedAccumulatorCombiningState<>(
+        return new FlinkKeyedCombiningState<>(
             stateBackend,
             address,
             combineFn,
@@ -160,12 +160,12 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           CombineWithContext.KeyedCombineFnWithContext<
               ? super K, InputT, AccumT, OutputT> combineFn) {
-        return new FlinkAccumulatorCombiningStateWithContext<>(
+        return new FlinkCombiningStateWithContext<>(
             stateBackend,
             address,
             combineFn,
@@ -464,17 +464,17 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
     }
   }
 
-  private class FlinkAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+  private class FlinkCombiningState<K, InputT, AccumT, OutputT>
       extends AbstractBroadcastState<AccumT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+      implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
 
-    FlinkAccumulatorCombiningState(
+    FlinkCombiningState(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder) {
@@ -486,7 +486,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -566,8 +566,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
         return false;
       }
 
-      FlinkAccumulatorCombiningState<?, ?, ?, ?> that =
-          (FlinkAccumulatorCombiningState<?, ?, ?, ?>) o;
+      FlinkCombiningState<?, ?, ?, ?> that =
+          (FlinkCombiningState<?, ?, ?, ?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 
@@ -581,18 +581,18 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
     }
   }
 
-  private class FlinkKeyedAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+  private class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
       extends AbstractBroadcastState<AccumT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+      implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
     private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
     private final FlinkBroadcastStateInternals<K> flinkStateInternals;
 
-    FlinkKeyedAccumulatorCombiningState(
+    FlinkKeyedCombiningState(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
@@ -607,7 +607,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -706,8 +706,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
         return false;
       }
 
-      FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?> that =
-          (FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?>) o;
+      FlinkKeyedCombiningState<?, ?, ?, ?> that =
+          (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 
@@ -721,20 +721,20 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
     }
   }
 
-  private class FlinkAccumulatorCombiningStateWithContext<K, InputT, AccumT, OutputT>
+  private class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
       extends AbstractBroadcastState<AccumT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+      implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
     private final CombineWithContext.KeyedCombineFnWithContext<
         ? super K, InputT, AccumT, OutputT> combineFn;
     private final FlinkBroadcastStateInternals<K> flinkStateInternals;
     private final CombineWithContext.Context context;
 
-    FlinkAccumulatorCombiningStateWithContext(
+    FlinkCombiningStateWithContext(
         DefaultOperatorStateBackend flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         CombineWithContext.KeyedCombineFnWithContext<
             ? super K, InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
@@ -752,7 +752,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -847,8 +847,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
         return false;
       }
 
-      FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?> that =
-          (FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?>) o;
+      FlinkCombiningStateWithContext<?, ?, ?, ?> that =
+          (FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index a29b1b2..24b340e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -156,9 +156,9 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT>
       bindCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
         throw new UnsupportedOperationException("bindCombiningValue is not supported.");
@@ -166,8 +166,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
         throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported.");
@@ -176,8 +176,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           CombineWithContext.KeyedCombineFnWithContext<
               ? super K, InputT, AccumT, OutputT> combineFn) {
@@ -190,7 +190,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
           StateTag<? super K, WatermarkHoldState<W>> address,
           OutputTimeFn<? super W> outputTimeFn) {
         throw new UnsupportedOperationException(
-            String.format("%s is not supported", AccumulatorCombiningState.class.getSimpleName()));
+            String.format("%s is not supported", CombiningState.class.getSimpleName()));
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index d9e87d1..2bf0bf1 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -28,8 +28,8 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -116,9 +116,9 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-      AccumulatorCombiningState<InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT>
       bindCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
         throw new UnsupportedOperationException("bindCombiningValue is not supported.");
@@ -126,8 +126,8 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-      AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
         throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported.");
@@ -136,8 +136,8 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-      AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           CombineWithContext.KeyedCombineFnWithContext<
               ? super K, InputT, AccumT, OutputT> combineFn) {
@@ -150,7 +150,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
           StateTag<? super K, WatermarkHoldState<W>> address,
           OutputTimeFn<? super W> outputTimeFn) {
         throw new UnsupportedOperationException(
-            String.format("%s is not supported", AccumulatorCombiningState.class.getSimpleName()));
+            String.format("%s is not supported", CombiningState.class.getSimpleName()));
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index 9033ba7..4f961e5 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -142,23 +142,23 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT>
       bindCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
 
-        return new FlinkAccumulatorCombiningState<>(
+        return new FlinkCombiningState<>(
             flinkStateBackend, address, combineFn, namespace, accumCoder);
       }
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-        return new FlinkKeyedAccumulatorCombiningState<>(
+        return new FlinkKeyedCombiningState<>(
             flinkStateBackend,
             address,
             combineFn,
@@ -169,12 +169,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
 
       @Override
       public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
           CombineWithContext.KeyedCombineFnWithContext<
               ? super K, InputT, AccumT, OutputT> combineFn) {
-        return new FlinkAccumulatorCombiningStateWithContext<>(
+        return new FlinkCombiningStateWithContext<>(
             flinkStateBackend,
             address,
             combineFn,
@@ -393,18 +393,18 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     }
   }
 
-  private static class FlinkAccumulatorCombiningState<K, InputT, AccumT, OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+  private static class FlinkCombiningState<K, InputT, AccumT, OutputT>
+      implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
     private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
 
-    FlinkAccumulatorCombiningState(
+    FlinkCombiningState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder) {
@@ -420,7 +420,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -546,8 +546,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
         return false;
       }
 
-      FlinkAccumulatorCombiningState<?, ?, ?, ?> that =
-          (FlinkAccumulatorCombiningState<?, ?, ?, ?>) o;
+      FlinkCombiningState<?, ?, ?, ?> that =
+          (FlinkCombiningState<?, ?, ?, ?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 
@@ -561,19 +561,19 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     }
   }
 
-  private static class FlinkKeyedAccumulatorCombiningState<K, InputT, AccumT, OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+  private static class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
+      implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
     private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
     private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
     private final FlinkStateInternals<K> flinkStateInternals;
 
-    FlinkKeyedAccumulatorCombiningState(
+    FlinkKeyedCombiningState(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
         Coder<AccumT> accumCoder,
@@ -591,7 +591,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -721,8 +721,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
         return false;
       }
 
-      FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?> that =
-          (FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?>) o;
+      FlinkKeyedCombiningState<?, ?, ?, ?> that =
+          (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 
@@ -736,11 +736,11 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     }
   }
 
-  private static class FlinkAccumulatorCombiningStateWithContext<K, InputT, AccumT, OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+  private static class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
+      implements CombiningState<InputT, AccumT, OutputT> {
 
     private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
     private final CombineWithContext.KeyedCombineFnWithContext<
         ? super K, InputT, AccumT, OutputT> combineFn;
     private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
@@ -748,9 +748,9 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     private final FlinkStateInternals<K> flinkStateInternals;
     private final CombineWithContext.Context context;
 
-    FlinkAccumulatorCombiningStateWithContext(
+    FlinkCombiningStateWithContext(
         KeyedStateBackend<ByteBuffer> flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         CombineWithContext.KeyedCombineFnWithContext<
             ? super K, InputT, AccumT, OutputT> combineFn,
         StateNamespace namespace,
@@ -771,7 +771,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -896,8 +896,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
         return false;
       }
 
-      FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?> that =
-          (FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?>) o;
+      FlinkCombiningStateWithContext<?, ?, ?, ?> that =
+          (FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
 
       return namespace.equals(that.namespace) && address.equals(that.address);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index f4e3ea8..7e7d1e1 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -32,8 +32,8 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkB
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -58,7 +58,7 @@ public class FlinkBroadcastStateInternalsTest {
 
   private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
@@ -202,9 +202,9 @@ public class FlinkBroadcastStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoSource() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
 
     value1.add(5);
@@ -223,11 +223,11 @@ public class FlinkBroadcastStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value3 =
+    CombiningState<Integer, int[], Integer> value3 =
         underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
 
     value1.add(5);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 27747dd..d140271 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -37,8 +37,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -72,7 +72,7 @@ public class FlinkStateInternalsTest {
 
   private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
   private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
@@ -232,9 +232,9 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoSource() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
 
     value1.add(5);
@@ -253,11 +253,11 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+    CombiningState<Integer, int[], Integer> value1 =
         underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+    CombiningState<Integer, int[], Integer> value2 =
         underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value3 =
+    CombiningState<Integer, int[], Integer> value3 =
         underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
 
     value1.add(5);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index 43fb383..725e9d3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -137,31 +137,31 @@ class SparkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             CombineFn<InputT, AccumT, OutputT> combineFn) {
-      return new SparkAccumulatorCombiningState<>(namespace, address, accumCoder, key,
+      return new SparkCombiningState<>(namespace, address, accumCoder, key,
           combineFn.<K>asKeyedFn());
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindKeyedCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      return new SparkAccumulatorCombiningState<>(namespace, address, accumCoder, key, combineFn);
+      return new SparkCombiningState<>(namespace, address, accumCoder, key, combineFn);
     }
 
     @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
         bindKeyedCombiningValueWithContext(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
-      return new SparkAccumulatorCombiningState<>(namespace, address, accumCoder, key,
+      return new SparkCombiningState<>(namespace, address, accumCoder, key,
           CombineFnUtil.bindContext(combineFn, c));
     }
 
@@ -300,16 +300,16 @@ class SparkStateInternals<K> implements StateInternals<K> {
     }
   }
 
-  private class SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+  private class SparkCombiningState<K, InputT, AccumT, OutputT>
       extends AbstractState<AccumT>
-          implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+          implements CombiningState<InputT, AccumT, OutputT> {
 
     private final K key;
     private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
 
-    private SparkAccumulatorCombiningState(
+    private SparkCombiningState(
         StateNamespace namespace,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         Coder<AccumT> coder,
         K key,
         KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
@@ -319,7 +319,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() {
+    public SparkCombiningState<K, InputT, AccumT, OutputT> readLater() {
       return this;
     }
 


[6/6] beam git commit: This closes #2413: Rename CombiningState and friends to be more user-friendly

Posted by ke...@apache.org.
This closes #2413: Rename CombiningState and friends to be more user-friendly

  Upgrade Dataflow worker image to beam-master-20170405
  Rename combiningValue to combining in StateSpecs
  Rename AccumulatorCombiningState to CombiningState
  Rename CombiningState to GroupingState


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b92032ff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b92032ff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b92032ff

Branch: refs/heads/master
Commit: b92032ff6d3fba7d29dd73f602977137b7539482
Parents: bb18384 359040e
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 6 15:33:28 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 15:33:28 2017 -0700

----------------------------------------------------------------------
 .../translation/utils/ApexStateInternals.java   |  28 ++---
 .../utils/ApexStateInternalsTest.java           |  18 +--
 .../runners/core/InMemoryStateInternals.java    |  36 +++---
 .../apache/beam/runners/core/NonEmptyPanes.java |   4 +-
 .../beam/runners/core/SideInputHandler.java     |  18 +--
 .../apache/beam/runners/core/StateMerging.java  |  16 +--
 .../org/apache/beam/runners/core/StateTag.java  |  14 +--
 .../org/apache/beam/runners/core/StateTags.java |  32 +++---
 .../beam/runners/core/SystemReduceFn.java       |   8 +-
 .../AfterDelayFromFirstElementStateMachine.java |  10 +-
 .../core/triggers/AfterPaneStateMachine.java    |   4 +-
 .../core/InMemoryStateInternalsTest.java        |  18 +--
 .../CopyOnAccessInMemoryStateInternals.java     |  46 ++++----
 .../CopyOnAccessInMemoryStateInternalsTest.java |  18 +--
 .../state/FlinkBroadcastStateInternals.java     |  68 ++++++------
 .../state/FlinkKeyGroupStateInternals.java      |  16 +--
 .../state/FlinkSplitStateInternals.java         |  16 +--
 .../streaming/state/FlinkStateInternals.java    |  68 ++++++------
 .../FlinkBroadcastStateInternalsTest.java       |  18 +--
 .../streaming/FlinkStateInternalsTest.java      |  18 +--
 runners/google-cloud-dataflow-java/pom.xml      |   2 +-
 .../spark/stateful/SparkStateInternals.java     |  30 ++---
 .../src/main/resources/beam/findbugs-filter.xml |   2 +-
 .../beam/sdk/transforms/GroupIntoBatches.java   |  12 +-
 .../util/state/AccumulatorCombiningState.java   |  53 ---------
 .../apache/beam/sdk/util/state/BagState.java    |   2 +-
 .../beam/sdk/util/state/CombiningState.java     |  27 +++--
 .../beam/sdk/util/state/GroupingState.java      |  42 +++++++
 .../apache/beam/sdk/util/state/SetState.java    |   2 +-
 .../org/apache/beam/sdk/util/state/State.java   |   2 +-
 .../apache/beam/sdk/util/state/StateBinder.java |  12 +-
 .../apache/beam/sdk/util/state/StateSpecs.java  | 110 +++++++++----------
 .../beam/sdk/util/state/WatermarkHoldState.java |   2 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  59 +++++-----
 34 files changed, 415 insertions(+), 416 deletions(-)
----------------------------------------------------------------------



[3/6] beam git commit: Rename CombiningState to GroupingState

Posted by ke...@apache.org.
Rename CombiningState to GroupingState


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/24c0495a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/24c0495a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/24c0495a

Branch: refs/heads/master
Commit: 24c0495a22dec9b7c44942794831b284f8caf78c
Parents: 0a17645
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 3 11:26:30 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 11:57:21 2017 -0700

----------------------------------------------------------------------
 .../utils/ApexStateInternalsTest.java           |  6 +--
 .../apache/beam/runners/core/StateMerging.java  |  4 +-
 .../beam/runners/core/SystemReduceFn.java       |  6 +--
 .../AfterDelayFromFirstElementStateMachine.java |  4 +-
 .../core/InMemoryStateInternalsTest.java        |  6 +--
 .../CopyOnAccessInMemoryStateInternalsTest.java | 14 +++----
 .../FlinkBroadcastStateInternalsTest.java       |  6 +--
 .../streaming/FlinkStateInternalsTest.java      |  6 +--
 .../util/state/AccumulatorCombiningState.java   |  4 +-
 .../apache/beam/sdk/util/state/BagState.java    |  2 +-
 .../beam/sdk/util/state/CombiningState.java     | 42 --------------------
 .../beam/sdk/util/state/GroupingState.java      | 42 ++++++++++++++++++++
 .../apache/beam/sdk/util/state/SetState.java    |  2 +-
 .../org/apache/beam/sdk/util/state/State.java   |  2 +-
 .../apache/beam/sdk/util/state/StateSpecs.java  |  2 +-
 .../beam/sdk/util/state/WatermarkHoldState.java |  2 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  2 +-
 17 files changed, 76 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 3e83a7f..a1494ad 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
@@ -148,7 +148,7 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testCombiningValue() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -168,7 +168,7 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testCombiningIsEmpty() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
     ReadableState<Boolean> readFuture = value.isEmpty();

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index e98d098..593d697 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.State;
@@ -159,7 +159,7 @@ public class StateMerging {
    * Prefetch all combining value state for {@code address} across all merging windows in {@code
    * context}.
    */
-  public static <K, StateT extends CombiningState<?, ?>, W extends BoundedWindow> void
+  public static <K, StateT extends GroupingState<?, ?>, W extends BoundedWindow> void
       prefetchCombiningValues(MergingStateAccessor<K, W> context,
           StateTag<? super K, StateT> address) {
     for (StateT state : context.accessInEachMergingWindow(address).values()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index bb7e4a9..0f2f790 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 
 /**
@@ -97,10 +97,10 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
     };
   }
 
-  private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag;
+  private StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag;
 
   public SystemReduceFn(
-      StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag) {
+      StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag) {
     this.bufferTag = bufferTag;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 4444c22..29c29a7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.format.PeriodFormat;
@@ -169,7 +169,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
 
   @Override
   public void onElement(OnElementContext c) throws Exception {
-    CombiningState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
+    GroupingState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
     Instant oldDelayUntil = delayUntilState.read();
 
     // Since processing time can only advance, resulting in target wake-up times we would

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 1da946f..5f90084 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -378,7 +378,7 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testCombiningValue() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -398,7 +398,7 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testCombiningIsEmpty() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
     ReadableState<Boolean> readFuture = value.isEmpty();

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index c7409bb..59c0a37 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -47,7 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -232,7 +232,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag =
         StateTags.combiningValue("summer",
             sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
-    CombiningState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
+    GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), equalTo(0L));
 
     underlyingValue.add(1L);
@@ -240,14 +240,14 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-    CombiningState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
+    GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
     assertThat(copyOnAccessState.read(), equalTo(1L));
 
     copyOnAccessState.add(4L);
     assertThat(copyOnAccessState.read(), equalTo(5L));
     assertThat(underlyingValue.read(), equalTo(1L));
 
-    CombiningState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
+    GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
   }
 
@@ -265,7 +265,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
             sumLongFn.getAccumulatorCoder(
                 reg, StringUtf8Coder.of(), reg.getDefaultCoder(Long.class)),
             sumLongFn);
-    CombiningState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
+    GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), equalTo(0L));
 
     underlyingValue.add(1L);
@@ -273,14 +273,14 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-    CombiningState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
+    GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
     assertThat(copyOnAccessState.read(), equalTo(1L));
 
     copyOnAccessState.add(4L);
     assertThat(copyOnAccessState.read(), equalTo(5L));
     assertThat(underlyingValue.read(), equalTo(1L));
 
-    CombiningState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
+    GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index db02cb3..f4e3ea8 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -169,7 +169,7 @@ public class FlinkBroadcastStateInternalsTest {
 
   @Test
   public void testCombiningValue() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -189,7 +189,7 @@ public class FlinkBroadcastStateInternalsTest {
 
   @Test
   public void testCombiningIsEmpty() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
     ReadableState<Boolean> readFuture = value.isEmpty();

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 7839cf3..27747dd 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -39,7 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
@@ -199,7 +199,7 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testCombiningValue() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -219,7 +219,7 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testCombiningIsEmpty() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
     ReadableState<Boolean> readFuture = value.isEmpty();

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
index 8dd1678..6b120f9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
@@ -21,14 +21,14 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 
 /**
  * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
- * to {@link CombiningState} that includes the {@code AccumT} type.
+ * to {@link GroupingState} that includes the {@code AccumT} type.
  *
  * @param <InputT> the type of values added to the state
  * @param <AccumT> the type of accumulator
  * @param <OutputT> the type of value extracted from the state
  */
 public interface AccumulatorCombiningState<InputT, AccumT, OutputT>
-    extends CombiningState<InputT, OutputT> {
+    extends GroupingState<InputT, OutputT> {
 
   /**
    * Read the merged accumulator for this combining value. It is implied that reading the

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
index c7e6d13..e0eebe5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
@@ -22,7 +22,7 @@ package org.apache.beam.sdk.util.state;
  *
  * @param <T> The type of elements in the bag.
  */
-public interface BagState<T> extends CombiningState<T, Iterable<T>> {
+public interface BagState<T> extends GroupingState<T, Iterable<T>> {
   @Override
   BagState<T> readLater();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
deleted file mode 100644
index 1155262..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
- * {@code OutputT} value.
- *
- * @param <InputT> the type of values added to the state
- * @param <OutputT> the type of value extracted from the state
- */
-public interface CombiningState<InputT, OutputT> extends ReadableState<OutputT>, State {
-  /**
-   * Add a value to the buffer.
-   */
-  void add(InputT value);
-
-  /**
-   * Return true if this state is empty.
-   */
-  ReadableState<Boolean> isEmpty();
-
-  @Override
-  CombiningState<InputT, OutputT> readLater();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
new file mode 100644
index 0000000..bd7a8d9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
+ * {@code OutputT} value.
+ *
+ * @param <InputT> the type of values added to the state
+ * @param <OutputT> the type of value extracted from the state
+ */
+public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State {
+  /**
+   * Add a value to the buffer.
+   */
+  void add(InputT value);
+
+  /**
+   * Return true if this state is empty.
+   */
+  ReadableState<Boolean> isEmpty();
+
+  @Override
+  GroupingState<InputT, OutputT> readLater();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
index 93058b2..5c907d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
@@ -23,7 +23,7 @@ package org.apache.beam.sdk.util.state;
  *
  * @param <T> The type of elements in the set.
  */
-public interface SetState<T> extends CombiningState<T, Iterable<T>> {
+public interface SetState<T> extends GroupingState<T, Iterable<T>> {
   /**
    * Returns true if this set contains the specified element.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
index 973cb9c..3a49f01 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.util.state;
  * Base interface for all state locations.
  *
  * <p>Specific types of state add appropriate accessors for reading and writing values, see
- * {@link ValueState}, {@link BagState}, and {@link CombiningState}.
+ * {@link ValueState}, {@link BagState}, and {@link GroupingState}.
  */
 public interface State {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index 6a8c80b..db4b7de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -445,7 +445,7 @@ public class StateSpecs {
 
     @Override public void finishSpecifying() {
       if (getAccumCoder() == null) {
-        throw new IllegalStateException("Unable to infer a coder for CombiningState and no"
+        throw new IllegalStateException("Unable to infer a coder for GroupingState and no"
             + " Coder was specified. Please set a coder by either invoking"
             + " StateSpecs.combiningValue(Coder<AccumT> accumCoder,"
             + " CombineFn<InputT, AccumT, OutputT> combineFn)"

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
index 415cc6e..20fa05f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
@@ -31,7 +31,7 @@ import org.joda.time.Instant;
  */
 @Experimental(Kind.STATE)
 public interface WatermarkHoldState<W extends BoundedWindow>
-    extends CombiningState<Instant, Instant> {
+    extends GroupingState<Instant, Instant> {
   /**
    * Return the {@link OutputTimeFn} which will be used to determine a watermark hold time given
    * an element timestamp, and to combine watermarks from windows which are about to be merged.

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 4249a77..cc67ac2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2495,7 +2495,7 @@ public class ParDoTest implements Serializable {
         };
 
     thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unable to infer a coder for CombiningState and no Coder was specified.");
+    thrown.expectMessage("Unable to infer a coder for GroupingState and no Coder was specified.");
 
     pipeline
         .apply(Create.of(KV.of("hello", 3), KV.of("hello", 6), KV.of("hello", 7)))