You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/06 22:34:13 UTC
[2/6] beam git commit: Rename combiningValue to combining in
StateSpecs
Rename combiningValue to combining in StateSpecs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33259d05
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33259d05
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33259d05
Branch: refs/heads/master
Commit: 33259d05e7ddfca7d27974394ea9e94b2b4b985c
Parents: ef480a3
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 3 11:37:50 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 11:57:21 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/runners/core/StateTags.java | 14 +--
.../src/main/resources/beam/findbugs-filter.xml | 2 +-
.../beam/sdk/transforms/GroupIntoBatches.java | 2 +-
.../apache/beam/sdk/util/state/StateBinder.java | 6 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 90 ++++++++++----------
.../apache/beam/sdk/transforms/ParDoTest.java | 18 ++--
6 files changed, 66 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 4893919..77ae8f5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -84,7 +84,7 @@ public class StateTags {
@Override
public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ CombiningState<InputT, AccumT, OutputT> bindCombining(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
@@ -94,7 +94,7 @@ public class StateTags {
@Override
public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombining(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
@@ -104,7 +104,7 @@ public class StateTags {
@Override
public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
@@ -162,7 +162,7 @@ public class StateTags {
combiningValue(
String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.combiningValue(accumCoder, combineFn));
+ new StructuredId(id), StateSpecs.combining(accumCoder, combineFn));
}
/**
@@ -174,7 +174,7 @@ public class StateTags {
keyedCombiningValue(String id, Coder<AccumT> accumCoder,
KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.keyedCombiningValue(accumCoder, combineFn));
+ new StructuredId(id), StateSpecs.keyedCombining(accumCoder, combineFn));
}
/**
@@ -188,7 +188,7 @@ public class StateTags {
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.keyedCombiningValueWithContext(accumCoder, combineFn));
+ new StructuredId(id), StateSpecs.keyedCombiningWithContext(accumCoder, combineFn));
}
/**
@@ -203,7 +203,7 @@ public class StateTags {
combiningValueFromInputInternal(
String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.combiningValueFromInputInternal(inputCoder, combineFn));
+ new StructuredId(id), StateSpecs.combiningFromInputInternal(inputCoder, combineFn));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 2799b00..1c0f301 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -380,7 +380,7 @@
<!--[BEAM-420] Non-transient non-serializable instance field in serializable class-->
</Match>
<Match>
- <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningValueStateSpec"/>
+ <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningStateSpec"/>
<Method name="equals"/>
<Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
<!--[BEAM-421] Class doesn't override equals in superclass-->
http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 26f1c98..2462b1c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -132,7 +132,7 @@ public class GroupIntoBatches<K, InputT>
this.allowedLateness = allowedLateness;
this.batchSpec = StateSpecs.bag(inputValueCoder);
this.numElementsInBatchSpec =
- StateSpecs.combiningValue(
+ StateSpecs.combining(
VarLongCoder.of(),
new Combine.CombineFn<Long, Long, Long>() {
http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index 98f7238..64841fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -39,21 +39,21 @@ public interface StateBinder<K> {
String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
- <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombining(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index 974e11d..30a7a6d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -62,9 +62,9 @@ public class StateSpecs {
* {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new CombiningValueStateSpec<InputT, AccumT, OutputT>(null, combineFn);
+ return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn);
}
/**
@@ -72,12 +72,12 @@ public class StateSpecs {
* {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
checkArgument(accumCoder != null,
"accumCoder should not be null. "
- + "Consider using combiningValue(CombineFn<> combineFn) instead.");
- return combiningValueInternal(accumCoder, combineFn);
+ + "Consider using combining(CombineFn<> combineFn) instead.");
+ return combiningInternal(accumCoder, combineFn);
}
/**
@@ -85,9 +85,9 @@ public class StateSpecs {
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT, OutputT>
- StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining(
KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
+ return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
}
/**
@@ -95,12 +95,12 @@ public class StateSpecs {
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT, OutputT>
- StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining(
Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
checkArgument(accumCoder != null,
"accumCoder should not be null. "
- + "Consider using keyedCombiningValue(KeyedCombineFn<> combineFn) instead.");
- return keyedCombiningValueInternal(accumCoder, combineFn);
+ + "Consider using keyedCombining(KeyedCombineFn<> combineFn) instead.");
+ return keyedCombiningInternal(accumCoder, combineFn);
}
/**
@@ -109,8 +109,8 @@ public class StateSpecs {
*/
public static <K, InputT, AccumT, OutputT>
StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
- keyedCombiningValueWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
- return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
+ keyedCombiningWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
+ return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
}
/**
@@ -119,13 +119,13 @@ public class StateSpecs {
*/
public static <K, InputT, AccumT, OutputT>
StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
- keyedCombiningValueWithContext(
+ keyedCombiningWithContext(
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
checkArgument(accumCoder != null,
"accumCoder should not be null. Consider using "
- + "keyedCombiningValueWithContext(KeyedCombineFnWithContext<> combineFn) instead.");
- return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(
+ + "keyedCombiningWithContext(KeyedCombineFnWithContext<> combineFn) instead.");
+ return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(
accumCoder, combineFn);
}
@@ -138,11 +138,11 @@ public class StateSpecs {
*/
public static <InputT, AccumT, OutputT>
StateSpec<Object, CombiningState<InputT, AccumT, OutputT>>
- combiningValueFromInputInternal(
+ combiningFromInputInternal(
Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
try {
Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
- return combiningValueInternal(accumCoder, combineFn);
+ return combiningInternal(accumCoder, combineFn);
} catch (CannotProvideCoderException e) {
throw new IllegalArgumentException(
"Unable to determine accumulator coder for "
@@ -154,15 +154,15 @@ public class StateSpecs {
}
private static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new CombiningValueStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
+ return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
}
private static <K, InputT, AccumT, OutputT>
- StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningInternal(
Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
+ return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
}
/**
@@ -220,17 +220,17 @@ public class StateSpecs {
public static <K, InputT, AccumT, OutputT>
StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
- if (combiningSpec instanceof KeyedCombiningValueStateSpec) {
+ if (combiningSpec instanceof KeyedCombiningStateSpec) {
// Checked above; conversion to a bag spec depends on the provided spec being one of those
// created via the factory methods in this class.
@SuppressWarnings("unchecked")
- KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT> typedSpec =
- (KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+ KeyedCombiningStateSpec<K, InputT, AccumT, OutputT> typedSpec =
+ (KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
return typedSpec.asBagSpec();
- } else if (combiningSpec instanceof KeyedCombiningValueWithContextStateSpec) {
+ } else if (combiningSpec instanceof KeyedCombiningWithContextStateSpec) {
@SuppressWarnings("unchecked")
- KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec =
- (KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+ KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec =
+ (KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
return typedSpec.asBagSpec();
} else {
throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec);
@@ -300,15 +300,15 @@ public class StateSpecs {
*
* <p>Includes the {@link CombineFn} and the coder for the accumulator type.
*/
- private static class CombiningValueStateSpec<InputT, AccumT, OutputT>
- extends KeyedCombiningValueStateSpec<Object, InputT, AccumT, OutputT>
+ private static class CombiningStateSpec<InputT, AccumT, OutputT>
+ extends KeyedCombiningStateSpec<Object, InputT, AccumT, OutputT>
implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
private final CombineFn<InputT, AccumT, OutputT> combineFn;
- private CombiningValueStateSpec(
+ private CombiningStateSpec(
@Nullable Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
super(accumCoder, combineFn.asKeyedFn());
@@ -338,14 +338,14 @@ public class StateSpecs {
*
* <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type.
*/
- private static class KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>
+ private static class KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>
implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
- protected KeyedCombiningValueWithContextStateSpec(
+ protected KeyedCombiningWithContextStateSpec(
@Nullable Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
this.combineFn = combineFn;
@@ -355,7 +355,7 @@ public class StateSpecs {
@Override
public CombiningState<InputT, AccumT, OutputT> bind(
String id, StateBinder<? extends K> visitor) {
- return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn);
+ return visitor.bindKeyedCombiningWithContext(id, this, accumCoder, combineFn);
}
@SuppressWarnings("unchecked")
@@ -371,9 +371,9 @@ public class StateSpecs {
@Override public void finishSpecifying() {
if (accumCoder == null) {
throw new IllegalStateException("Unable to infer a coder for"
- + " KeyedCombiningValueWithContextState and no Coder was specified."
+ + " KeyedCombiningWithContextState and no Coder was specified."
+ " Please set a coder by either invoking"
- + " StateSpecs.keyedCombiningValue(Coder<AccumT> accumCoder,"
+ + " StateSpecs.keyedCombining(Coder<AccumT> accumCoder,"
+ " KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn)"
+ " or by registering the coder in the Pipeline's CoderRegistry.");
}
@@ -385,12 +385,12 @@ public class StateSpecs {
return true;
}
- if (!(obj instanceof KeyedCombiningValueWithContextStateSpec)) {
+ if (!(obj instanceof KeyedCombiningWithContextStateSpec)) {
return false;
}
- KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?> that =
- (KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?>) obj;
+ KeyedCombiningWithContextStateSpec<?, ?, ?, ?> that =
+ (KeyedCombiningWithContextStateSpec<?, ?, ?, ?>) obj;
return Objects.equals(this.accumCoder, that.accumCoder);
}
@@ -409,14 +409,14 @@ public class StateSpecs {
*
* <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type.
*/
- private static class KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>
+ private static class KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>
implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
- protected KeyedCombiningValueStateSpec(
+ protected KeyedCombiningStateSpec(
@Nullable Coder<AccumT> accumCoder,
KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
this.keyedCombineFn = keyedCombineFn;
@@ -430,7 +430,7 @@ public class StateSpecs {
@Override
public CombiningState<InputT, AccumT, OutputT> bind(
String id, StateBinder<? extends K> visitor) {
- return visitor.bindKeyedCombiningValue(id, this, getAccumCoder(), keyedCombineFn);
+ return visitor.bindKeyedCombining(id, this, getAccumCoder(), keyedCombineFn);
}
@SuppressWarnings("unchecked")
@@ -447,7 +447,7 @@ public class StateSpecs {
if (getAccumCoder() == null) {
throw new IllegalStateException("Unable to infer a coder for GroupingState and no"
+ " Coder was specified. Please set a coder by either invoking"
- + " StateSpecs.combiningValue(Coder<AccumT> accumCoder,"
+ + " StateSpecs.combining(Coder<AccumT> accumCoder,"
+ " CombineFn<InputT, AccumT, OutputT> combineFn)"
+ " or by registering the coder in the Pipeline's CoderRegistry.");
}
@@ -459,12 +459,12 @@ public class StateSpecs {
return true;
}
- if (!(obj instanceof CombiningValueStateSpec)) {
+ if (!(obj instanceof CombiningStateSpec)) {
return false;
}
- KeyedCombiningValueStateSpec<?, ?, ?, ?> that =
- (KeyedCombiningValueStateSpec<?, ?, ?, ?>) obj;
+ KeyedCombiningStateSpec<?, ?, ?, ?> that =
+ (KeyedCombiningStateSpec<?, ?, ?, ?>) obj;
return Objects.equals(this.accumCoder, that.accumCoder);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d9b7b54..e305da1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2087,7 +2087,7 @@ public class ParDoTest implements Serializable {
StateSpecs.set(VarIntCoder.of());
@StateId(countStateId)
private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+ countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
@@ -2131,7 +2131,7 @@ public class ParDoTest implements Serializable {
@StateId(countStateId)
private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+ countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
@@ -2174,7 +2174,7 @@ public class ParDoTest implements Serializable {
@StateId(countStateId)
private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+ countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
@@ -2216,7 +2216,7 @@ public class ParDoTest implements Serializable {
StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
@StateId(countStateId)
private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+ countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
@@ -2262,7 +2262,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
@StateId(countStateId)
private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+ countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
@@ -2308,7 +2308,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
@StateId(countStateId)
private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+ countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
@@ -2354,7 +2354,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<
Object, CombiningState<Double, CountSum<Double>, Double>>
combiningState =
- StateSpecs.combiningValue(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
+ StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
@ProcessElement
public void processElement(
@@ -2394,7 +2394,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<
Object, CombiningState<Integer, MyInteger, Integer>>
combiningState =
- StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
+ StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
@Override
public MyInteger createAccumulator() {
return new MyInteger(0);
@@ -2456,7 +2456,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<
Object, CombiningState<Integer, MyInteger, Integer>>
combiningState =
- StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
+ StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
@Override
public MyInteger createAccumulator() {
return new MyInteger(0);