You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/06 22:34:15 UTC
[4/6] beam git commit: Rename AccumulatorCombiningState to
CombiningState
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 095ca2a..26f1c98 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -29,8 +29,8 @@ import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.util.state.StateSpecs;
import org.apache.beam.sdk.util.state.ValueState;
@@ -115,7 +115,7 @@ public class GroupIntoBatches<K, InputT>
private final StateSpec<Object, BagState<InputT>> batchSpec;
@StateId(NUM_ELEMENTS_IN_BATCH_ID)
- private final StateSpec<Object, AccumulatorCombiningState<Long, Long, Long>>
+ private final StateSpec<Object, CombiningState<Long, Long, Long>>
numElementsInBatchSpec;
@StateId(KEY_ID)
@@ -171,7 +171,7 @@ public class GroupIntoBatches<K, InputT>
@TimerId(END_OF_WINDOW_ID) Timer timer,
@StateId(BATCH_ID) BagState<InputT> batch,
@StateId(NUM_ELEMENTS_IN_BATCH_ID)
- AccumulatorCombiningState<Long, Long, Long> numElementsInBatch,
+ CombiningState<Long, Long, Long> numElementsInBatch,
@StateId(KEY_ID) ValueState<K> key,
ProcessContext c,
BoundedWindow window) {
@@ -203,7 +203,7 @@ public class GroupIntoBatches<K, InputT>
@StateId(KEY_ID) ValueState<K> key,
@StateId(BATCH_ID) BagState<InputT> batch,
@StateId(NUM_ELEMENTS_IN_BATCH_ID)
- AccumulatorCombiningState<Long, Long, Long> numElementsInBatch,
+ CombiningState<Long, Long, Long> numElementsInBatch,
BoundedWindow window) {
LOGGER.debug(
"*** END OF WINDOW *** for timer timestamp {} in windows {}",
@@ -215,7 +215,7 @@ public class GroupIntoBatches<K, InputT>
Context c,
ValueState<K> key,
BagState<InputT> batch,
- AccumulatorCombiningState<Long, Long, Long> numElementsInBatch) {
+ CombiningState<Long, Long, Long> numElementsInBatch) {
Iterable<InputT> values = batch.read();
// when the timer fires, batch state might be empty
if (Iterables.size(values) > 0) {
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
deleted file mode 100644
index 6b120f9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
- * to {@link GroupingState} that includes the {@code AccumT} type.
- *
- * @param <InputT> the type of values added to the state
- * @param <AccumT> the type of accumulator
- * @param <OutputT> the type of value extracted from the state
- */
-public interface AccumulatorCombiningState<InputT, AccumT, OutputT>
- extends GroupingState<InputT, OutputT> {
-
- /**
- * Read the merged accumulator for this combining value. It is implied that reading the
- * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
- * this.
- */
- AccumT getAccum();
-
- /**
- * Add an accumulator to this combining value. Depending on implementation this may immediately
- * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
- */
- void addAccum(AccumT accum);
-
- /**
- * Merge the given accumulators according to the underlying combiner.
- */
- AccumT mergeAccumulators(Iterable<AccumT> accumulators);
-
- @Override
- AccumulatorCombiningState<InputT, AccumT, OutputT> readLater();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
new file mode 100644
index 0000000..80e4dc9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
+ * to {@link GroupingState} that includes the {@code AccumT} type.
+ *
+ * @param <InputT> the type of values added to the state
+ * @param <AccumT> the type of accumulator
+ * @param <OutputT> the type of value extracted from the state
+ */
+public interface CombiningState<InputT, AccumT, OutputT>
+ extends GroupingState<InputT, OutputT> {
+
+ /**
+ * Read the merged accumulator for this combining value. It is implied that reading the
+ * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
+ * this.
+ */
+ AccumT getAccum();
+
+ /**
+ * Add an accumulator to this combining value. Depending on implementation this may immediately
+ * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
+ */
+ void addAccum(AccumT accum);
+
+ /**
+ * Merge the given accumulators according to the underlying combiner.
+ */
+ AccumT mergeAccumulators(Iterable<AccumT> accumulators);
+
+ @Override
+ CombiningState<InputT, AccumT, OutputT> readLater();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index fbfb475..98f7238 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -39,23 +39,23 @@ public interface StateBinder<K> {
String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
- <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
String id,
- StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
String id,
- StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
String id,
- StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
combineFn);
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index db4b7de..974e11d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -62,7 +62,7 @@ public class StateSpecs {
* {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue(
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
CombineFn<InputT, AccumT, OutputT> combineFn) {
return new CombiningValueStateSpec<InputT, AccumT, OutputT>(null, combineFn);
}
@@ -72,7 +72,7 @@ public class StateSpecs {
* {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue(
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
checkArgument(accumCoder != null,
"accumCoder should not be null. "
@@ -85,7 +85,7 @@ public class StateSpecs {
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT, OutputT>
- StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
}
@@ -95,7 +95,7 @@ public class StateSpecs {
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT, OutputT>
- StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
checkArgument(accumCoder != null,
"accumCoder should not be null. "
@@ -108,7 +108,7 @@ public class StateSpecs {
* merge multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT, OutputT>
- StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
keyedCombiningValueWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
}
@@ -118,7 +118,7 @@ public class StateSpecs {
* merge multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT, OutputT>
- StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
keyedCombiningValueWithContext(
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
@@ -137,7 +137,7 @@ public class StateSpecs {
* only be used to initialize static values.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>>
combiningValueFromInputInternal(
Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
try {
@@ -154,13 +154,13 @@ public class StateSpecs {
}
private static <InputT, AccumT, OutputT>
- StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new CombiningValueStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
}
private static <K, InputT, AccumT, OutputT>
- StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
}
@@ -219,7 +219,7 @@ public class StateSpecs {
public static <K, InputT, AccumT, OutputT>
StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
- StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningSpec) {
+ StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
if (combiningSpec instanceof KeyedCombiningValueStateSpec) {
// Checked above; conversion to a bag spec depends on the provided spec being one of those
// created via the factory methods in this class.
@@ -302,7 +302,7 @@ public class StateSpecs {
*/
private static class CombiningValueStateSpec<InputT, AccumT, OutputT>
extends KeyedCombiningValueStateSpec<Object, InputT, AccumT, OutputT>
- implements StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+ implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
@@ -339,7 +339,7 @@ public class StateSpecs {
* <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type.
*/
private static class KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>
- implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+ implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
@@ -353,7 +353,7 @@ public class StateSpecs {
}
@Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
+ public CombiningState<InputT, AccumT, OutputT> bind(
String id, StateBinder<? extends K> visitor) {
return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn);
}
@@ -410,7 +410,7 @@ public class StateSpecs {
* <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type.
*/
private static class KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>
- implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+ implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
@@ -428,7 +428,7 @@ public class StateSpecs {
}
@Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
+ public CombiningState<InputT, AccumT, OutputT> bind(
String id, StateBinder<? extends K> visitor) {
return visitor.bindKeyedCombiningValue(id, this, getAccumCoder(), keyedCombineFn);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index cc67ac2..d9b7b54 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -77,6 +77,7 @@ import org.apache.beam.sdk.testing.UsesTimersInParDo;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn.OnTimer;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.Mean.CountSum;
import org.apache.beam.sdk.transforms.ParDo.SingleOutput;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -92,8 +93,8 @@ import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
import org.apache.beam.sdk.util.TimerSpecs;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.SetState;
import org.apache.beam.sdk.util.state.StateSpec;
@@ -2085,7 +2086,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<Object, SetState<Integer>> setState =
StateSpecs.set(VarIntCoder.of());
@StateId(countStateId)
- private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2093,7 +2094,7 @@ public class ParDoTest implements Serializable {
public void processElement(
ProcessContext c,
@StateId(stateId) SetState<Integer> state,
- @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+ @StateId(countStateId) CombiningState<Integer, int[], Integer>
count) {
state.add(c.element().getValue());
count.add(1);
@@ -2129,7 +2130,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
@StateId(countStateId)
- private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2137,7 +2138,7 @@ public class ParDoTest implements Serializable {
public void processElement(
ProcessContext c,
@StateId(stateId) SetState<MyInteger> state,
- @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> count) {
+ @StateId(countStateId) CombiningState<Integer, int[], Integer> count) {
state.add(new MyInteger(c.element().getValue()));
count.add(1);
if (count.read() >= 4) {
@@ -2172,7 +2173,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
@StateId(countStateId)
- private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2180,7 +2181,7 @@ public class ParDoTest implements Serializable {
public void processElement(
ProcessContext c,
@StateId(stateId) SetState<MyInteger> state,
- @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> count) {
+ @StateId(countStateId) CombiningState<Integer, int[], Integer> count) {
state.add(new MyInteger(c.element().getValue()));
count.add(1);
if (count.read() >= 4) {
@@ -2214,14 +2215,14 @@ public class ParDoTest implements Serializable {
private final StateSpec<Object, MapState<String, Integer>> mapState =
StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
@StateId(countStateId)
- private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
public void processElement(
ProcessContext c, @StateId(stateId) MapState<String, Integer> state,
- @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+ @StateId(countStateId) CombiningState<Integer, int[], Integer>
count) {
KV<String, Integer> value = c.element().getValue();
state.put(value.getKey(), value.getValue());
@@ -2260,14 +2261,14 @@ public class ParDoTest implements Serializable {
@StateId(stateId)
private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
@StateId(countStateId)
- private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
public void processElement(
ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state,
- @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+ @StateId(countStateId) CombiningState<Integer, int[], Integer>
count) {
KV<String, Integer> value = c.element().getValue();
state.put(value.getKey(), new MyInteger(value.getValue()));
@@ -2306,14 +2307,14 @@ public class ParDoTest implements Serializable {
@StateId(stateId)
private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
@StateId(countStateId)
- private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
public void processElement(
ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state,
- @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+ @StateId(countStateId) CombiningState<Integer, int[], Integer>
count) {
KV<String, Integer> value = c.element().getValue();
state.put(value.getKey(), new MyInteger(value.getValue()));
@@ -2351,7 +2352,7 @@ public class ParDoTest implements Serializable {
@StateId(stateId)
private final StateSpec<
- Object, AccumulatorCombiningState<Double, Mean.CountSum<Double>, Double>>
+ Object, CombiningState<Double, CountSum<Double>, Double>>
combiningState =
StateSpecs.combiningValue(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
@@ -2359,7 +2360,7 @@ public class ParDoTest implements Serializable {
public void processElement(
ProcessContext c,
@StateId(stateId)
- AccumulatorCombiningState<Double, Mean.CountSum<Double>, Double> state) {
+ CombiningState<Double, CountSum<Double>, Double> state) {
state.add(c.element().getValue());
Double currentValue = state.read();
if (Math.abs(currentValue - 0.5) < EPSILON) {
@@ -2391,7 +2392,7 @@ public class ParDoTest implements Serializable {
@StateId(stateId)
private final StateSpec<
- Object, AccumulatorCombiningState<Integer, MyInteger, Integer>>
+ Object, CombiningState<Integer, MyInteger, Integer>>
combiningState =
StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
@Override
@@ -2423,7 +2424,7 @@ public class ParDoTest implements Serializable {
public void processElement(
ProcessContext c,
@StateId(stateId)
- AccumulatorCombiningState<Integer, MyInteger, Integer> state) {
+ CombiningState<Integer, MyInteger, Integer> state) {
state.add(c.element().getValue());
Integer currentValue = state.read();
if (currentValue == EXPECTED_SUM) {
@@ -2453,7 +2454,7 @@ public class ParDoTest implements Serializable {
@StateId(stateId)
private final StateSpec<
- Object, AccumulatorCombiningState<Integer, MyInteger, Integer>>
+ Object, CombiningState<Integer, MyInteger, Integer>>
combiningState =
StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
@Override
@@ -2485,7 +2486,7 @@ public class ParDoTest implements Serializable {
public void processElement(
ProcessContext c,
@StateId(stateId)
- AccumulatorCombiningState<Integer, MyInteger, Integer> state) {
+ CombiningState<Integer, MyInteger, Integer> state) {
state.add(c.element().getValue());
Integer currentValue = state.read();
if (currentValue == EXPECTED_SUM) {