You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/06 22:34:13 UTC

[2/6] beam git commit: Rename combiningValue to combining in StateSpecs

Rename combiningValue to combining in StateSpecs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33259d05
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33259d05
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33259d05

Branch: refs/heads/master
Commit: 33259d05e7ddfca7d27974394ea9e94b2b4b985c
Parents: ef480a3
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 3 11:37:50 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 11:57:21 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/core/StateTags.java | 14 +--
 .../src/main/resources/beam/findbugs-filter.xml |  2 +-
 .../beam/sdk/transforms/GroupIntoBatches.java   |  2 +-
 .../apache/beam/sdk/util/state/StateBinder.java |  6 +-
 .../apache/beam/sdk/util/state/StateSpecs.java  | 90 ++++++++++----------
 .../apache/beam/sdk/transforms/ParDoTest.java   | 18 ++--
 6 files changed, 66 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 4893919..77ae8f5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -84,7 +84,7 @@ public class StateTags {
 
       @Override
       public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+      CombiningState<InputT, AccumT, OutputT> bindCombining(
               String id,
               StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
@@ -94,7 +94,7 @@ public class StateTags {
 
       @Override
       public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombining(
               String id,
               StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
@@ -104,7 +104,7 @@ public class StateTags {
 
       @Override
       public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext(
               String id,
               StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
@@ -162,7 +162,7 @@ public class StateTags {
     combiningValue(
       String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.combiningValue(accumCoder, combineFn));
+        new StructuredId(id), StateSpecs.combining(accumCoder, combineFn));
   }
 
   /**
@@ -174,7 +174,7 @@ public class StateTags {
       keyedCombiningValue(String id, Coder<AccumT> accumCoder,
           KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.keyedCombiningValue(accumCoder, combineFn));
+        new StructuredId(id), StateSpecs.keyedCombining(accumCoder, combineFn));
   }
 
   /**
@@ -188,7 +188,7 @@ public class StateTags {
           Coder<AccumT> accumCoder,
           KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.keyedCombiningValueWithContext(accumCoder, combineFn));
+        new StructuredId(id), StateSpecs.keyedCombiningWithContext(accumCoder, combineFn));
   }
 
   /**
@@ -203,7 +203,7 @@ public class StateTags {
       combiningValueFromInputInternal(
           String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.combiningValueFromInputInternal(inputCoder, combineFn));
+        new StructuredId(id), StateSpecs.combiningFromInputInternal(inputCoder, combineFn));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 2799b00..1c0f301 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -380,7 +380,7 @@
     <!--[BEAM-420] Non-transient non-serializable instance field in serializable class-->
   </Match>
   <Match>
-    <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningValueStateSpec"/>
+    <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningStateSpec"/>
     <Method name="equals"/>
     <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
     <!--[BEAM-421] Class doesn't override equals in superclass-->

http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 26f1c98..2462b1c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -132,7 +132,7 @@ public class GroupIntoBatches<K, InputT>
       this.allowedLateness = allowedLateness;
       this.batchSpec = StateSpecs.bag(inputValueCoder);
       this.numElementsInBatchSpec =
-          StateSpecs.combiningValue(
+          StateSpecs.combining(
               VarLongCoder.of(),
               new Combine.CombineFn<Long, Long, Long>() {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index 98f7238..64841fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -39,21 +39,21 @@ public interface StateBinder<K> {
       String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
 
-  <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+  <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(
       String id,
       StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
       Coder<AccumT> accumCoder,
       Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
 
   <InputT, AccumT, OutputT>
-  CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+  CombiningState<InputT, AccumT, OutputT> bindKeyedCombining(
           String id,
           StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
           Coder<AccumT> accumCoder,
           Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
 
   <InputT, AccumT, OutputT>
-  CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+  CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext(
           String id,
           StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
           Coder<AccumT> accumCoder,

http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index 974e11d..30a7a6d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -62,9 +62,9 @@ public class StateSpecs {
    * {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-  StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
+  StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
       CombineFn<InputT, AccumT, OutputT> combineFn) {
-    return new CombiningValueStateSpec<InputT, AccumT, OutputT>(null, combineFn);
+    return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn);
   }
 
   /**
@@ -72,12 +72,12 @@ public class StateSpecs {
    * {@code InputT}s into a single {@code OutputT}.
    */
   public static <InputT, AccumT, OutputT>
-      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
+      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
           Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     checkArgument(accumCoder != null,
         "accumCoder should not be null. "
-            + "Consider using combiningValue(CombineFn<> combineFn) instead.");
-    return combiningValueInternal(accumCoder, combineFn);
+            + "Consider using combining(CombineFn<> combineFn) instead.");
+    return combiningInternal(accumCoder, combineFn);
   }
 
   /**
@@ -85,9 +85,9 @@ public class StateSpecs {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-  StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+  StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining(
       KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
+    return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
   }
 
   /**
@@ -95,12 +95,12 @@ public class StateSpecs {
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
   public static <K, InputT, AccumT, OutputT>
-      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining(
           Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
     checkArgument(accumCoder != null,
         "accumCoder should not be null. "
-            + "Consider using keyedCombiningValue(KeyedCombineFn<> combineFn) instead.");
-    return keyedCombiningValueInternal(accumCoder, combineFn);
+            + "Consider using keyedCombining(KeyedCombineFn<> combineFn) instead.");
+    return keyedCombiningInternal(accumCoder, combineFn);
   }
 
   /**
@@ -109,8 +109,8 @@ public class StateSpecs {
    */
   public static <K, InputT, AccumT, OutputT>
   StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
-  keyedCombiningValueWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
-    return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
+  keyedCombiningWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
+    return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
   }
 
   /**
@@ -119,13 +119,13 @@ public class StateSpecs {
    */
   public static <K, InputT, AccumT, OutputT>
       StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
-          keyedCombiningValueWithContext(
+  keyedCombiningWithContext(
               Coder<AccumT> accumCoder,
               KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
     checkArgument(accumCoder != null,
         "accumCoder should not be null. Consider using "
-            + "keyedCombiningValueWithContext(KeyedCombineFnWithContext<> combineFn) instead.");
-    return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(
+            + "keyedCombiningWithContext(KeyedCombineFnWithContext<> combineFn) instead.");
+    return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(
         accumCoder, combineFn);
   }
 
@@ -138,11 +138,11 @@ public class StateSpecs {
    */
   public static <InputT, AccumT, OutputT>
       StateSpec<Object, CombiningState<InputT, AccumT, OutputT>>
-          combiningValueFromInputInternal(
+  combiningFromInputInternal(
               Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
     try {
       Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
-      return combiningValueInternal(accumCoder, combineFn);
+      return combiningInternal(accumCoder, combineFn);
     } catch (CannotProvideCoderException e) {
       throw new IllegalArgumentException(
           "Unable to determine accumulator coder for "
@@ -154,15 +154,15 @@ public class StateSpecs {
   }
 
   private static <InputT, AccumT, OutputT>
-      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
+      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
           Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
-    return new CombiningValueStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
+    return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
 
   private static <K, InputT, AccumT, OutputT>
-      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
+      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningInternal(
           Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
+    return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
 
   /**
@@ -220,17 +220,17 @@ public class StateSpecs {
   public static <K, InputT, AccumT, OutputT>
       StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
           StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
-    if (combiningSpec instanceof KeyedCombiningValueStateSpec) {
+    if (combiningSpec instanceof KeyedCombiningStateSpec) {
       // Checked above; conversion to a bag spec depends on the provided spec being one of those
       // created via the factory methods in this class.
       @SuppressWarnings("unchecked")
-      KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT> typedSpec =
-          (KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+      KeyedCombiningStateSpec<K, InputT, AccumT, OutputT> typedSpec =
+          (KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
       return typedSpec.asBagSpec();
-    } else if (combiningSpec instanceof KeyedCombiningValueWithContextStateSpec) {
+    } else if (combiningSpec instanceof KeyedCombiningWithContextStateSpec) {
       @SuppressWarnings("unchecked")
-      KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec =
-          (KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+      KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec =
+          (KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
       return typedSpec.asBagSpec();
     } else {
       throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec);
@@ -300,15 +300,15 @@ public class StateSpecs {
    *
    * <p>Includes the {@link CombineFn} and the coder for the accumulator type.
    */
-  private static class CombiningValueStateSpec<InputT, AccumT, OutputT>
-      extends KeyedCombiningValueStateSpec<Object, InputT, AccumT, OutputT>
+  private static class CombiningStateSpec<InputT, AccumT, OutputT>
+      extends KeyedCombiningStateSpec<Object, InputT, AccumT, OutputT>
       implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
     private final CombineFn<InputT, AccumT, OutputT> combineFn;
 
-    private CombiningValueStateSpec(
+    private CombiningStateSpec(
         @Nullable Coder<AccumT> accumCoder,
         CombineFn<InputT, AccumT, OutputT> combineFn) {
       super(accumCoder, combineFn.asKeyedFn());
@@ -338,14 +338,14 @@ public class StateSpecs {
    *
    * <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type.
    */
-  private static class KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>
+  private static class KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>
       implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
     private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
 
-    protected KeyedCombiningValueWithContextStateSpec(
+    protected KeyedCombiningWithContextStateSpec(
         @Nullable Coder<AccumT> accumCoder,
         KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
       this.combineFn = combineFn;
@@ -355,7 +355,7 @@ public class StateSpecs {
     @Override
     public CombiningState<InputT, AccumT, OutputT> bind(
         String id, StateBinder<? extends K> visitor) {
-      return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn);
+      return visitor.bindKeyedCombiningWithContext(id, this, accumCoder, combineFn);
     }
 
     @SuppressWarnings("unchecked")
@@ -371,9 +371,9 @@ public class StateSpecs {
     @Override public void finishSpecifying() {
       if (accumCoder == null) {
         throw new IllegalStateException("Unable to infer a coder for"
-            + " KeyedCombiningValueWithContextState and no Coder was specified."
+            + " KeyedCombiningWithContextState and no Coder was specified."
             + " Please set a coder by either invoking"
-            + " StateSpecs.keyedCombiningValue(Coder<AccumT> accumCoder,"
+            + " StateSpecs.keyedCombining(Coder<AccumT> accumCoder,"
             + " KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn)"
             + " or by registering the coder in the Pipeline's CoderRegistry.");
       }
@@ -385,12 +385,12 @@ public class StateSpecs {
         return true;
       }
 
-      if (!(obj instanceof KeyedCombiningValueWithContextStateSpec)) {
+      if (!(obj instanceof KeyedCombiningWithContextStateSpec)) {
         return false;
       }
 
-      KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?> that =
-          (KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?>) obj;
+      KeyedCombiningWithContextStateSpec<?, ?, ?, ?> that =
+          (KeyedCombiningWithContextStateSpec<?, ?, ?, ?>) obj;
       return Objects.equals(this.accumCoder, that.accumCoder);
     }
 
@@ -409,14 +409,14 @@ public class StateSpecs {
    *
    * <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type.
    */
-  private static class KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>
+  private static class KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>
       implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
     private Coder<AccumT> accumCoder;
     private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
 
-    protected KeyedCombiningValueStateSpec(
+    protected KeyedCombiningStateSpec(
         @Nullable Coder<AccumT> accumCoder,
         KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
       this.keyedCombineFn = keyedCombineFn;
@@ -430,7 +430,7 @@ public class StateSpecs {
     @Override
     public CombiningState<InputT, AccumT, OutputT> bind(
         String id, StateBinder<? extends K> visitor) {
-      return visitor.bindKeyedCombiningValue(id, this, getAccumCoder(), keyedCombineFn);
+      return visitor.bindKeyedCombining(id, this, getAccumCoder(), keyedCombineFn);
     }
 
     @SuppressWarnings("unchecked")
@@ -447,7 +447,7 @@ public class StateSpecs {
       if (getAccumCoder() == null) {
         throw new IllegalStateException("Unable to infer a coder for GroupingState and no"
             + " Coder was specified. Please set a coder by either invoking"
-            + " StateSpecs.combiningValue(Coder<AccumT> accumCoder,"
+            + " StateSpecs.combining(Coder<AccumT> accumCoder,"
             + " CombineFn<InputT, AccumT, OutputT> combineFn)"
             + " or by registering the coder in the Pipeline's CoderRegistry.");
       }
@@ -459,12 +459,12 @@ public class StateSpecs {
         return true;
       }
 
-      if (!(obj instanceof CombiningValueStateSpec)) {
+      if (!(obj instanceof CombiningStateSpec)) {
         return false;
       }
 
-      KeyedCombiningValueStateSpec<?, ?, ?, ?> that =
-          (KeyedCombiningValueStateSpec<?, ?, ?, ?>) obj;
+      KeyedCombiningStateSpec<?, ?, ?, ?> that =
+          (KeyedCombiningStateSpec<?, ?, ?, ?>) obj;
       return Objects.equals(this.accumCoder, that.accumCoder);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d9b7b54..e305da1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2087,7 +2087,7 @@ public class ParDoTest implements Serializable {
               StateSpecs.set(VarIntCoder.of());
           @StateId(countStateId)
           private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
-              countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+              countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
@@ -2131,7 +2131,7 @@ public class ParDoTest implements Serializable {
 
           @StateId(countStateId)
           private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
-              countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+              countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
@@ -2174,7 +2174,7 @@ public class ParDoTest implements Serializable {
 
           @StateId(countStateId)
           private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
-              countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+              countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
@@ -2216,7 +2216,7 @@ public class ParDoTest implements Serializable {
               StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
           @StateId(countStateId)
           private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
-              countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+              countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
@@ -2262,7 +2262,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
           @StateId(countStateId)
           private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
-              countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+              countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
@@ -2308,7 +2308,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
           @StateId(countStateId)
           private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
-              countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+              countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
               Sum.ofIntegers());
 
           @ProcessElement
@@ -2354,7 +2354,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<
                   Object, CombiningState<Double, CountSum<Double>, Double>>
               combiningState =
-                  StateSpecs.combiningValue(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
+                  StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
 
           @ProcessElement
           public void processElement(
@@ -2394,7 +2394,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<
               Object, CombiningState<Integer, MyInteger, Integer>>
               combiningState =
-              StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
+              StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
                 @Override
                 public MyInteger createAccumulator() {
                   return new MyInteger(0);
@@ -2456,7 +2456,7 @@ public class ParDoTest implements Serializable {
           private final StateSpec<
               Object, CombiningState<Integer, MyInteger, Integer>>
               combiningState =
-              StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
+              StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
                 @Override
                 public MyInteger createAccumulator() {
                   return new MyInteger(0);