You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/06 22:34:14 UTC

[3/6] beam git commit: Rename CombiningState to GroupingState

Rename CombiningState to GroupingState


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/24c0495a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/24c0495a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/24c0495a

Branch: refs/heads/master
Commit: 24c0495a22dec9b7c44942794831b284f8caf78c
Parents: 0a17645
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 3 11:26:30 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 11:57:21 2017 -0700

----------------------------------------------------------------------
 .../utils/ApexStateInternalsTest.java           |  6 +--
 .../apache/beam/runners/core/StateMerging.java  |  4 +-
 .../beam/runners/core/SystemReduceFn.java       |  6 +--
 .../AfterDelayFromFirstElementStateMachine.java |  4 +-
 .../core/InMemoryStateInternalsTest.java        |  6 +--
 .../CopyOnAccessInMemoryStateInternalsTest.java | 14 +++----
 .../FlinkBroadcastStateInternalsTest.java       |  6 +--
 .../streaming/FlinkStateInternalsTest.java      |  6 +--
 .../util/state/AccumulatorCombiningState.java   |  4 +-
 .../apache/beam/sdk/util/state/BagState.java    |  2 +-
 .../beam/sdk/util/state/CombiningState.java     | 42 --------------------
 .../beam/sdk/util/state/GroupingState.java      | 42 ++++++++++++++++++++
 .../apache/beam/sdk/util/state/SetState.java    |  2 +-
 .../org/apache/beam/sdk/util/state/State.java   |  2 +-
 .../apache/beam/sdk/util/state/StateSpecs.java  |  2 +-
 .../beam/sdk/util/state/WatermarkHoldState.java |  2 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  2 +-
 17 files changed, 76 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 3e83a7f..a1494ad 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
@@ -148,7 +148,7 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testCombiningValue() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -168,7 +168,7 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testCombiningIsEmpty() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
     ReadableState<Boolean> readFuture = value.isEmpty();

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index e98d098..593d697 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.State;
@@ -159,7 +159,7 @@ public class StateMerging {
    * Prefetch all combining value state for {@code address} across all merging windows in {@code
    * context}.
    */
-  public static <K, StateT extends CombiningState<?, ?>, W extends BoundedWindow> void
+  public static <K, StateT extends GroupingState<?, ?>, W extends BoundedWindow> void
       prefetchCombiningValues(MergingStateAccessor<K, W> context,
           StateTag<? super K, StateT> address) {
     for (StateT state : context.accessInEachMergingWindow(address).values()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index bb7e4a9..0f2f790 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 
 /**
@@ -97,10 +97,10 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
     };
   }
 
-  private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag;
+  private StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag;
 
   public SystemReduceFn(
-      StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag) {
+      StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag) {
     this.bufferTag = bufferTag;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 4444c22..29c29a7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.format.PeriodFormat;
@@ -169,7 +169,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
 
   @Override
   public void onElement(OnElementContext c) throws Exception {
-    CombiningState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
+    GroupingState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
     Instant oldDelayUntil = delayUntilState.read();
 
     // Since processing time can only advance, resulting in target wake-up times we would

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 1da946f..5f90084 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.SetState;
@@ -378,7 +378,7 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testCombiningValue() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -398,7 +398,7 @@ public class InMemoryStateInternalsTest {
 
   @Test
   public void testCombiningIsEmpty() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
     ReadableState<Boolean> readFuture = value.isEmpty();

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index c7409bb..59c0a37 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -47,7 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.MapState;
 import org.apache.beam.sdk.util.state.SetState;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -232,7 +232,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag =
         StateTags.combiningValue("summer",
             sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
-    CombiningState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
+    GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), equalTo(0L));
 
     underlyingValue.add(1L);
@@ -240,14 +240,14 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-    CombiningState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
+    GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
     assertThat(copyOnAccessState.read(), equalTo(1L));
 
     copyOnAccessState.add(4L);
     assertThat(copyOnAccessState.read(), equalTo(5L));
     assertThat(underlyingValue.read(), equalTo(1L));
 
-    CombiningState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
+    GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
   }
 
@@ -265,7 +265,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
             sumLongFn.getAccumulatorCoder(
                 reg, StringUtf8Coder.of(), reg.getDefaultCoder(Long.class)),
             sumLongFn);
-    CombiningState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
+    GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), equalTo(0L));
 
     underlyingValue.add(1L);
@@ -273,14 +273,14 @@ public class CopyOnAccessInMemoryStateInternalsTest {
 
     CopyOnAccessInMemoryStateInternals<String> internals =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-    CombiningState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
+    GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
     assertThat(copyOnAccessState.read(), equalTo(1L));
 
     copyOnAccessState.add(4L);
     assertThat(copyOnAccessState.read(), equalTo(5L));
     assertThat(underlyingValue.read(), equalTo(1L));
 
-    CombiningState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
+    GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index db02cb3..f4e3ea8 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -169,7 +169,7 @@ public class FlinkBroadcastStateInternalsTest {
 
   @Test
   public void testCombiningValue() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -189,7 +189,7 @@ public class FlinkBroadcastStateInternalsTest {
 
   @Test
   public void testCombiningIsEmpty() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
     ReadableState<Boolean> readFuture = value.isEmpty();

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 7839cf3..27747dd 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -39,7 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
@@ -199,7 +199,7 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testCombiningValue() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     // State instances are cached, but depend on the namespace.
     assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -219,7 +219,7 @@ public class FlinkStateInternalsTest {
 
   @Test
   public void testCombiningIsEmpty() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
 
     assertThat(value.isEmpty().read(), Matchers.is(true));
     ReadableState<Boolean> readFuture = value.isEmpty();

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
index 8dd1678..6b120f9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
@@ -21,14 +21,14 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 
 /**
  * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
- * to {@link CombiningState} that includes the {@code AccumT} type.
+ * to {@link GroupingState} that includes the {@code AccumT} type.
  *
  * @param <InputT> the type of values added to the state
  * @param <AccumT> the type of accumulator
  * @param <OutputT> the type of value extracted from the state
  */
 public interface AccumulatorCombiningState<InputT, AccumT, OutputT>
-    extends CombiningState<InputT, OutputT> {
+    extends GroupingState<InputT, OutputT> {
 
   /**
    * Read the merged accumulator for this combining value. It is implied that reading the

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
index c7e6d13..e0eebe5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
@@ -22,7 +22,7 @@ package org.apache.beam.sdk.util.state;
  *
  * @param <T> The type of elements in the bag.
  */
-public interface BagState<T> extends CombiningState<T, Iterable<T>> {
+public interface BagState<T> extends GroupingState<T, Iterable<T>> {
   @Override
   BagState<T> readLater();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
deleted file mode 100644
index 1155262..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
- * {@code OutputT} value.
- *
- * @param <InputT> the type of values added to the state
- * @param <OutputT> the type of value extracted from the state
- */
-public interface CombiningState<InputT, OutputT> extends ReadableState<OutputT>, State {
-  /**
-   * Add a value to the buffer.
-   */
-  void add(InputT value);
-
-  /**
-   * Return true if this state is empty.
-   */
-  ReadableState<Boolean> isEmpty();
-
-  @Override
-  CombiningState<InputT, OutputT> readLater();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
new file mode 100644
index 0000000..bd7a8d9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
+ * {@code OutputT} value.
+ *
+ * @param <InputT> the type of values added to the state
+ * @param <OutputT> the type of value extracted from the state
+ */
+public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State {
+  /**
+   * Add a value to the buffer.
+   */
+  void add(InputT value);
+
+  /**
+   * Return true if this state is empty.
+   */
+  ReadableState<Boolean> isEmpty();
+
+  @Override
+  GroupingState<InputT, OutputT> readLater();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
index 93058b2..5c907d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
@@ -23,7 +23,7 @@ package org.apache.beam.sdk.util.state;
  *
  * @param <T> The type of elements in the set.
  */
-public interface SetState<T> extends CombiningState<T, Iterable<T>> {
+public interface SetState<T> extends GroupingState<T, Iterable<T>> {
   /**
    * Returns true if this set contains the specified element.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
index 973cb9c..3a49f01 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.util.state;
  * Base interface for all state locations.
  *
  * <p>Specific types of state add appropriate accessors for reading and writing values, see
- * {@link ValueState}, {@link BagState}, and {@link CombiningState}.
+ * {@link ValueState}, {@link BagState}, and {@link GroupingState}.
  */
 public interface State {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index 6a8c80b..db4b7de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -445,7 +445,7 @@ public class StateSpecs {
 
     @Override public void finishSpecifying() {
       if (getAccumCoder() == null) {
-        throw new IllegalStateException("Unable to infer a coder for CombiningState and no"
+        throw new IllegalStateException("Unable to infer a coder for GroupingState and no"
             + " Coder was specified. Please set a coder by either invoking"
             + " StateSpecs.combiningValue(Coder<AccumT> accumCoder,"
             + " CombineFn<InputT, AccumT, OutputT> combineFn)"

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
index 415cc6e..20fa05f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
@@ -31,7 +31,7 @@ import org.joda.time.Instant;
  */
 @Experimental(Kind.STATE)
 public interface WatermarkHoldState<W extends BoundedWindow>
-    extends CombiningState<Instant, Instant> {
+    extends GroupingState<Instant, Instant> {
   /**
    * Return the {@link OutputTimeFn} which will be used to determine a watermark hold time given
    * an element timestamp, and to combine watermarks from windows which are about to be merged.

http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 4249a77..cc67ac2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2495,7 +2495,7 @@ public class ParDoTest implements Serializable {
         };
 
     thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unable to infer a coder for CombiningState and no Coder was specified.");
+    thrown.expectMessage("Unable to infer a coder for GroupingState and no Coder was specified.");
 
     pipeline
         .apply(Create.of(KV.of("hello", 3), KV.of("hello", 6), KV.of("hello", 7)))