You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/06 22:34:12 UTC
[1/6] beam git commit: Upgrade Dataflow worker image to
beam-master-20170405
Repository: beam
Updated Branches:
refs/heads/master bb1838476 -> b92032ff6
Upgrade Dataflow worker image to beam-master-20170405
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/359040e8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/359040e8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/359040e8
Branch: refs/heads/master
Commit: 359040e8f5151bde21752150030dd759766fc3e8
Parents: 33259d0
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 5 18:53:58 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 11:57:21 2017 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/359040e8/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index fe531c7..2e3dc8a 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
<packaging>jar</packaging>
<properties>
- <dataflow.container_version>beam-master-20170329</dataflow.container_version>
+ <dataflow.container_version>beam-master-20170405</dataflow.container_version>
<dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
<dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
</properties>
[4/6] beam git commit: Rename AccumulatorCombiningState to
CombiningState
Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 095ca2a..26f1c98 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -29,8 +29,8 @@ import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.util.state.StateSpecs;
import org.apache.beam.sdk.util.state.ValueState;
@@ -115,7 +115,7 @@ public class GroupIntoBatches<K, InputT>
private final StateSpec<Object, BagState<InputT>> batchSpec;
@StateId(NUM_ELEMENTS_IN_BATCH_ID)
- private final StateSpec<Object, AccumulatorCombiningState<Long, Long, Long>>
+ private final StateSpec<Object, CombiningState<Long, Long, Long>>
numElementsInBatchSpec;
@StateId(KEY_ID)
@@ -171,7 +171,7 @@ public class GroupIntoBatches<K, InputT>
@TimerId(END_OF_WINDOW_ID) Timer timer,
@StateId(BATCH_ID) BagState<InputT> batch,
@StateId(NUM_ELEMENTS_IN_BATCH_ID)
- AccumulatorCombiningState<Long, Long, Long> numElementsInBatch,
+ CombiningState<Long, Long, Long> numElementsInBatch,
@StateId(KEY_ID) ValueState<K> key,
ProcessContext c,
BoundedWindow window) {
@@ -203,7 +203,7 @@ public class GroupIntoBatches<K, InputT>
@StateId(KEY_ID) ValueState<K> key,
@StateId(BATCH_ID) BagState<InputT> batch,
@StateId(NUM_ELEMENTS_IN_BATCH_ID)
- AccumulatorCombiningState<Long, Long, Long> numElementsInBatch,
+ CombiningState<Long, Long, Long> numElementsInBatch,
BoundedWindow window) {
LOGGER.debug(
"*** END OF WINDOW *** for timer timestamp {} in windows {}",
@@ -215,7 +215,7 @@ public class GroupIntoBatches<K, InputT>
Context c,
ValueState<K> key,
BagState<InputT> batch,
- AccumulatorCombiningState<Long, Long, Long> numElementsInBatch) {
+ CombiningState<Long, Long, Long> numElementsInBatch) {
Iterable<InputT> values = batch.read();
// when the timer fires, batch state might be empty
if (Iterables.size(values) > 0) {
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
deleted file mode 100644
index 6b120f9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
- * to {@link GroupingState} that includes the {@code AccumT} type.
- *
- * @param <InputT> the type of values added to the state
- * @param <AccumT> the type of accumulator
- * @param <OutputT> the type of value extracted from the state
- */
-public interface AccumulatorCombiningState<InputT, AccumT, OutputT>
- extends GroupingState<InputT, OutputT> {
-
- /**
- * Read the merged accumulator for this combining value. It is implied that reading the
- * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
- * this.
- */
- AccumT getAccum();
-
- /**
- * Add an accumulator to this combining value. Depending on implementation this may immediately
- * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
- */
- void addAccum(AccumT accum);
-
- /**
- * Merge the given accumulators according to the underlying combiner.
- */
- AccumT mergeAccumulators(Iterable<AccumT> accumulators);
-
- @Override
- AccumulatorCombiningState<InputT, AccumT, OutputT> readLater();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
new file mode 100644
index 0000000..80e4dc9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * State for a single value that is managed by a {@link CombineFn}. This is an internal extension
+ * to {@link GroupingState} that includes the {@code AccumT} type.
+ *
+ * @param <InputT> the type of values added to the state
+ * @param <AccumT> the type of accumulator
+ * @param <OutputT> the type of value extracted from the state
+ */
+public interface CombiningState<InputT, AccumT, OutputT>
+ extends GroupingState<InputT, OutputT> {
+
+ /**
+ * Read the merged accumulator for this combining value. It is implied that reading the
+ * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for
+ * this.
+ */
+ AccumT getAccum();
+
+ /**
+ * Add an accumulator to this combining value. Depending on implementation this may immediately
+ * merge it with the previous accumulator, or may buffer this accumulator for a future merge.
+ */
+ void addAccum(AccumT accum);
+
+ /**
+ * Merge the given accumulators according to the underlying combiner.
+ */
+ AccumT mergeAccumulators(Iterable<AccumT> accumulators);
+
+ @Override
+ CombiningState<InputT, AccumT, OutputT> readLater();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index fbfb475..98f7238 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -39,23 +39,23 @@ public interface StateBinder<K> {
String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
- <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
String id,
- StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
String id,
- StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
String id,
- StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
combineFn);
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index db4b7de..974e11d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -62,7 +62,7 @@ public class StateSpecs {
* {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue(
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
CombineFn<InputT, AccumT, OutputT> combineFn) {
return new CombiningValueStateSpec<InputT, AccumT, OutputT>(null, combineFn);
}
@@ -72,7 +72,7 @@ public class StateSpecs {
* {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue(
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
checkArgument(accumCoder != null,
"accumCoder should not be null. "
@@ -85,7 +85,7 @@ public class StateSpecs {
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT, OutputT>
- StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
}
@@ -95,7 +95,7 @@ public class StateSpecs {
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT, OutputT>
- StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
checkArgument(accumCoder != null,
"accumCoder should not be null. "
@@ -108,7 +108,7 @@ public class StateSpecs {
* merge multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT, OutputT>
- StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
keyedCombiningValueWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
}
@@ -118,7 +118,7 @@ public class StateSpecs {
* merge multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT, OutputT>
- StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
keyedCombiningValueWithContext(
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
@@ -137,7 +137,7 @@ public class StateSpecs {
* only be used to initialize static values.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>>
combiningValueFromInputInternal(
Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
try {
@@ -154,13 +154,13 @@ public class StateSpecs {
}
private static <InputT, AccumT, OutputT>
- StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new CombiningValueStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
}
private static <K, InputT, AccumT, OutputT>
- StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
}
@@ -219,7 +219,7 @@ public class StateSpecs {
public static <K, InputT, AccumT, OutputT>
StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
- StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningSpec) {
+ StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
if (combiningSpec instanceof KeyedCombiningValueStateSpec) {
// Checked above; conversion to a bag spec depends on the provided spec being one of those
// created via the factory methods in this class.
@@ -302,7 +302,7 @@ public class StateSpecs {
*/
private static class CombiningValueStateSpec<InputT, AccumT, OutputT>
extends KeyedCombiningValueStateSpec<Object, InputT, AccumT, OutputT>
- implements StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+ implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
@@ -339,7 +339,7 @@ public class StateSpecs {
* <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type.
*/
private static class KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>
- implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+ implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
@@ -353,7 +353,7 @@ public class StateSpecs {
}
@Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
+ public CombiningState<InputT, AccumT, OutputT> bind(
String id, StateBinder<? extends K> visitor) {
return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn);
}
@@ -410,7 +410,7 @@ public class StateSpecs {
* <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type.
*/
private static class KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>
- implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> {
+ implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
@@ -428,7 +428,7 @@ public class StateSpecs {
}
@Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> bind(
+ public CombiningState<InputT, AccumT, OutputT> bind(
String id, StateBinder<? extends K> visitor) {
return visitor.bindKeyedCombiningValue(id, this, getAccumCoder(), keyedCombineFn);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index cc67ac2..d9b7b54 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -77,6 +77,7 @@ import org.apache.beam.sdk.testing.UsesTimersInParDo;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn.OnTimer;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.Mean.CountSum;
import org.apache.beam.sdk.transforms.ParDo.SingleOutput;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -92,8 +93,8 @@ import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
import org.apache.beam.sdk.util.TimerSpecs;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.SetState;
import org.apache.beam.sdk.util.state.StateSpec;
@@ -2085,7 +2086,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<Object, SetState<Integer>> setState =
StateSpecs.set(VarIntCoder.of());
@StateId(countStateId)
- private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2093,7 +2094,7 @@ public class ParDoTest implements Serializable {
public void processElement(
ProcessContext c,
@StateId(stateId) SetState<Integer> state,
- @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+ @StateId(countStateId) CombiningState<Integer, int[], Integer>
count) {
state.add(c.element().getValue());
count.add(1);
@@ -2129,7 +2130,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
@StateId(countStateId)
- private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2137,7 +2138,7 @@ public class ParDoTest implements Serializable {
public void processElement(
ProcessContext c,
@StateId(stateId) SetState<MyInteger> state,
- @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> count) {
+ @StateId(countStateId) CombiningState<Integer, int[], Integer> count) {
state.add(new MyInteger(c.element().getValue()));
count.add(1);
if (count.read() >= 4) {
@@ -2172,7 +2173,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
@StateId(countStateId)
- private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2180,7 +2181,7 @@ public class ParDoTest implements Serializable {
public void processElement(
ProcessContext c,
@StateId(stateId) SetState<MyInteger> state,
- @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> count) {
+ @StateId(countStateId) CombiningState<Integer, int[], Integer> count) {
state.add(new MyInteger(c.element().getValue()));
count.add(1);
if (count.read() >= 4) {
@@ -2214,14 +2215,14 @@ public class ParDoTest implements Serializable {
private final StateSpec<Object, MapState<String, Integer>> mapState =
StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
@StateId(countStateId)
- private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
public void processElement(
ProcessContext c, @StateId(stateId) MapState<String, Integer> state,
- @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+ @StateId(countStateId) CombiningState<Integer, int[], Integer>
count) {
KV<String, Integer> value = c.element().getValue();
state.put(value.getKey(), value.getValue());
@@ -2260,14 +2261,14 @@ public class ParDoTest implements Serializable {
@StateId(stateId)
private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
@StateId(countStateId)
- private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
public void processElement(
ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state,
- @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+ @StateId(countStateId) CombiningState<Integer, int[], Integer>
count) {
KV<String, Integer> value = c.element().getValue();
state.put(value.getKey(), new MyInteger(value.getValue()));
@@ -2306,14 +2307,14 @@ public class ParDoTest implements Serializable {
@StateId(stateId)
private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
@StateId(countStateId)
- private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
public void processElement(
ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state,
- @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer>
+ @StateId(countStateId) CombiningState<Integer, int[], Integer>
count) {
KV<String, Integer> value = c.element().getValue();
state.put(value.getKey(), new MyInteger(value.getValue()));
@@ -2351,7 +2352,7 @@ public class ParDoTest implements Serializable {
@StateId(stateId)
private final StateSpec<
- Object, AccumulatorCombiningState<Double, Mean.CountSum<Double>, Double>>
+ Object, CombiningState<Double, CountSum<Double>, Double>>
combiningState =
StateSpecs.combiningValue(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
@@ -2359,7 +2360,7 @@ public class ParDoTest implements Serializable {
public void processElement(
ProcessContext c,
@StateId(stateId)
- AccumulatorCombiningState<Double, Mean.CountSum<Double>, Double> state) {
+ CombiningState<Double, CountSum<Double>, Double> state) {
state.add(c.element().getValue());
Double currentValue = state.read();
if (Math.abs(currentValue - 0.5) < EPSILON) {
@@ -2391,7 +2392,7 @@ public class ParDoTest implements Serializable {
@StateId(stateId)
private final StateSpec<
- Object, AccumulatorCombiningState<Integer, MyInteger, Integer>>
+ Object, CombiningState<Integer, MyInteger, Integer>>
combiningState =
StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
@Override
@@ -2423,7 +2424,7 @@ public class ParDoTest implements Serializable {
public void processElement(
ProcessContext c,
@StateId(stateId)
- AccumulatorCombiningState<Integer, MyInteger, Integer> state) {
+ CombiningState<Integer, MyInteger, Integer> state) {
state.add(c.element().getValue());
Integer currentValue = state.read();
if (currentValue == EXPECTED_SUM) {
@@ -2453,7 +2454,7 @@ public class ParDoTest implements Serializable {
@StateId(stateId)
private final StateSpec<
- Object, AccumulatorCombiningState<Integer, MyInteger, Integer>>
+ Object, CombiningState<Integer, MyInteger, Integer>>
combiningState =
StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
@Override
@@ -2485,7 +2486,7 @@ public class ParDoTest implements Serializable {
public void processElement(
ProcessContext c,
@StateId(stateId)
- AccumulatorCombiningState<Integer, MyInteger, Integer> state) {
+ CombiningState<Integer, MyInteger, Integer> state) {
state.add(c.element().getValue());
Integer currentValue = state.read();
if (currentValue == EXPECTED_SUM) {
[2/6] beam git commit: Rename combiningValue to combining in
StateSpecs
Posted by ke...@apache.org.
Rename combiningValue to combining in StateSpecs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33259d05
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33259d05
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33259d05
Branch: refs/heads/master
Commit: 33259d05e7ddfca7d27974394ea9e94b2b4b985c
Parents: ef480a3
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 3 11:37:50 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 11:57:21 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/runners/core/StateTags.java | 14 +--
.../src/main/resources/beam/findbugs-filter.xml | 2 +-
.../beam/sdk/transforms/GroupIntoBatches.java | 2 +-
.../apache/beam/sdk/util/state/StateBinder.java | 6 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 90 ++++++++++----------
.../apache/beam/sdk/transforms/ParDoTest.java | 18 ++--
6 files changed, 66 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 4893919..77ae8f5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -84,7 +84,7 @@ public class StateTags {
@Override
public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ CombiningState<InputT, AccumT, OutputT> bindCombining(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
@@ -94,7 +94,7 @@ public class StateTags {
@Override
public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombining(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
@@ -104,7 +104,7 @@ public class StateTags {
@Override
public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
@@ -162,7 +162,7 @@ public class StateTags {
combiningValue(
String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.combiningValue(accumCoder, combineFn));
+ new StructuredId(id), StateSpecs.combining(accumCoder, combineFn));
}
/**
@@ -174,7 +174,7 @@ public class StateTags {
keyedCombiningValue(String id, Coder<AccumT> accumCoder,
KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.keyedCombiningValue(accumCoder, combineFn));
+ new StructuredId(id), StateSpecs.keyedCombining(accumCoder, combineFn));
}
/**
@@ -188,7 +188,7 @@ public class StateTags {
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.keyedCombiningValueWithContext(accumCoder, combineFn));
+ new StructuredId(id), StateSpecs.keyedCombiningWithContext(accumCoder, combineFn));
}
/**
@@ -203,7 +203,7 @@ public class StateTags {
combiningValueFromInputInternal(
String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
- new StructuredId(id), StateSpecs.combiningValueFromInputInternal(inputCoder, combineFn));
+ new StructuredId(id), StateSpecs.combiningFromInputInternal(inputCoder, combineFn));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 2799b00..1c0f301 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -380,7 +380,7 @@
<!--[BEAM-420] Non-transient non-serializable instance field in serializable class-->
</Match>
<Match>
- <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningValueStateSpec"/>
+ <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningStateSpec"/>
<Method name="equals"/>
<Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
<!--[BEAM-421] Class doesn't override equals in superclass-->
http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 26f1c98..2462b1c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -132,7 +132,7 @@ public class GroupIntoBatches<K, InputT>
this.allowedLateness = allowedLateness;
this.batchSpec = StateSpecs.bag(inputValueCoder);
this.numElementsInBatchSpec =
- StateSpecs.combiningValue(
+ StateSpecs.combining(
VarLongCoder.of(),
new Combine.CombineFn<Long, Long, Long>() {
http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index 98f7238..64841fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -39,21 +39,21 @@ public interface StateBinder<K> {
String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
- <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombining(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext(
String id,
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index 974e11d..30a7a6d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -62,9 +62,9 @@ public class StateSpecs {
* {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new CombiningValueStateSpec<InputT, AccumT, OutputT>(null, combineFn);
+ return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn);
}
/**
@@ -72,12 +72,12 @@ public class StateSpecs {
* {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue(
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
checkArgument(accumCoder != null,
"accumCoder should not be null. "
- + "Consider using combiningValue(CombineFn<> combineFn) instead.");
- return combiningValueInternal(accumCoder, combineFn);
+ + "Consider using combining(CombineFn<> combineFn) instead.");
+ return combiningInternal(accumCoder, combineFn);
}
/**
@@ -85,9 +85,9 @@ public class StateSpecs {
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT, OutputT>
- StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining(
KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
+ return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
}
/**
@@ -95,12 +95,12 @@ public class StateSpecs {
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT, OutputT>
- StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining(
Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
checkArgument(accumCoder != null,
"accumCoder should not be null. "
- + "Consider using keyedCombiningValue(KeyedCombineFn<> combineFn) instead.");
- return keyedCombiningValueInternal(accumCoder, combineFn);
+ + "Consider using keyedCombining(KeyedCombineFn<> combineFn) instead.");
+ return keyedCombiningInternal(accumCoder, combineFn);
}
/**
@@ -109,8 +109,8 @@ public class StateSpecs {
*/
public static <K, InputT, AccumT, OutputT>
StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
- keyedCombiningValueWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
- return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
+ keyedCombiningWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
+ return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
}
/**
@@ -119,13 +119,13 @@ public class StateSpecs {
*/
public static <K, InputT, AccumT, OutputT>
StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
- keyedCombiningValueWithContext(
+ keyedCombiningWithContext(
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
checkArgument(accumCoder != null,
"accumCoder should not be null. Consider using "
- + "keyedCombiningValueWithContext(KeyedCombineFnWithContext<> combineFn) instead.");
- return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(
+ + "keyedCombiningWithContext(KeyedCombineFnWithContext<> combineFn) instead.");
+ return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(
accumCoder, combineFn);
}
@@ -138,11 +138,11 @@ public class StateSpecs {
*/
public static <InputT, AccumT, OutputT>
StateSpec<Object, CombiningState<InputT, AccumT, OutputT>>
- combiningValueFromInputInternal(
+ combiningFromInputInternal(
Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
try {
Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
- return combiningValueInternal(accumCoder, combineFn);
+ return combiningInternal(accumCoder, combineFn);
} catch (CannotProvideCoderException e) {
throw new IllegalArgumentException(
"Unable to determine accumulator coder for "
@@ -154,15 +154,15 @@ public class StateSpecs {
}
private static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValueInternal(
+ StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new CombiningValueStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
+ return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
}
private static <K, InputT, AccumT, OutputT>
- StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal(
+ StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningInternal(
Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
+ return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
}
/**
@@ -220,17 +220,17 @@ public class StateSpecs {
public static <K, InputT, AccumT, OutputT>
StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
- if (combiningSpec instanceof KeyedCombiningValueStateSpec) {
+ if (combiningSpec instanceof KeyedCombiningStateSpec) {
// Checked above; conversion to a bag spec depends on the provided spec being one of those
// created via the factory methods in this class.
@SuppressWarnings("unchecked")
- KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT> typedSpec =
- (KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+ KeyedCombiningStateSpec<K, InputT, AccumT, OutputT> typedSpec =
+ (KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
return typedSpec.asBagSpec();
- } else if (combiningSpec instanceof KeyedCombiningValueWithContextStateSpec) {
+ } else if (combiningSpec instanceof KeyedCombiningWithContextStateSpec) {
@SuppressWarnings("unchecked")
- KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec =
- (KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+ KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec =
+ (KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
return typedSpec.asBagSpec();
} else {
throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec);
@@ -300,15 +300,15 @@ public class StateSpecs {
*
* <p>Includes the {@link CombineFn} and the coder for the accumulator type.
*/
- private static class CombiningValueStateSpec<InputT, AccumT, OutputT>
- extends KeyedCombiningValueStateSpec<Object, InputT, AccumT, OutputT>
+ private static class CombiningStateSpec<InputT, AccumT, OutputT>
+ extends KeyedCombiningStateSpec<Object, InputT, AccumT, OutputT>
implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
private final CombineFn<InputT, AccumT, OutputT> combineFn;
- private CombiningValueStateSpec(
+ private CombiningStateSpec(
@Nullable Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
super(accumCoder, combineFn.asKeyedFn());
@@ -338,14 +338,14 @@ public class StateSpecs {
*
* <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type.
*/
- private static class KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>
+ private static class KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>
implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
- protected KeyedCombiningValueWithContextStateSpec(
+ protected KeyedCombiningWithContextStateSpec(
@Nullable Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
this.combineFn = combineFn;
@@ -355,7 +355,7 @@ public class StateSpecs {
@Override
public CombiningState<InputT, AccumT, OutputT> bind(
String id, StateBinder<? extends K> visitor) {
- return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn);
+ return visitor.bindKeyedCombiningWithContext(id, this, accumCoder, combineFn);
}
@SuppressWarnings("unchecked")
@@ -371,9 +371,9 @@ public class StateSpecs {
@Override public void finishSpecifying() {
if (accumCoder == null) {
throw new IllegalStateException("Unable to infer a coder for"
- + " KeyedCombiningValueWithContextState and no Coder was specified."
+ + " KeyedCombiningWithContextState and no Coder was specified."
+ " Please set a coder by either invoking"
- + " StateSpecs.keyedCombiningValue(Coder<AccumT> accumCoder,"
+ + " StateSpecs.keyedCombining(Coder<AccumT> accumCoder,"
+ " KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn)"
+ " or by registering the coder in the Pipeline's CoderRegistry.");
}
@@ -385,12 +385,12 @@ public class StateSpecs {
return true;
}
- if (!(obj instanceof KeyedCombiningValueWithContextStateSpec)) {
+ if (!(obj instanceof KeyedCombiningWithContextStateSpec)) {
return false;
}
- KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?> that =
- (KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?>) obj;
+ KeyedCombiningWithContextStateSpec<?, ?, ?, ?> that =
+ (KeyedCombiningWithContextStateSpec<?, ?, ?, ?>) obj;
return Objects.equals(this.accumCoder, that.accumCoder);
}
@@ -409,14 +409,14 @@ public class StateSpecs {
*
* <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type.
*/
- private static class KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>
+ private static class KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>
implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
- protected KeyedCombiningValueStateSpec(
+ protected KeyedCombiningStateSpec(
@Nullable Coder<AccumT> accumCoder,
KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
this.keyedCombineFn = keyedCombineFn;
@@ -430,7 +430,7 @@ public class StateSpecs {
@Override
public CombiningState<InputT, AccumT, OutputT> bind(
String id, StateBinder<? extends K> visitor) {
- return visitor.bindKeyedCombiningValue(id, this, getAccumCoder(), keyedCombineFn);
+ return visitor.bindKeyedCombining(id, this, getAccumCoder(), keyedCombineFn);
}
@SuppressWarnings("unchecked")
@@ -447,7 +447,7 @@ public class StateSpecs {
if (getAccumCoder() == null) {
throw new IllegalStateException("Unable to infer a coder for GroupingState and no"
+ " Coder was specified. Please set a coder by either invoking"
- + " StateSpecs.combiningValue(Coder<AccumT> accumCoder,"
+ + " StateSpecs.combining(Coder<AccumT> accumCoder,"
+ " CombineFn<InputT, AccumT, OutputT> combineFn)"
+ " or by registering the coder in the Pipeline's CoderRegistry.");
}
@@ -459,12 +459,12 @@ public class StateSpecs {
return true;
}
- if (!(obj instanceof CombiningValueStateSpec)) {
+ if (!(obj instanceof CombiningStateSpec)) {
return false;
}
- KeyedCombiningValueStateSpec<?, ?, ?, ?> that =
- (KeyedCombiningValueStateSpec<?, ?, ?, ?>) obj;
+ KeyedCombiningStateSpec<?, ?, ?, ?> that =
+ (KeyedCombiningStateSpec<?, ?, ?, ?>) obj;
return Objects.equals(this.accumCoder, that.accumCoder);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d9b7b54..e305da1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2087,7 +2087,7 @@ public class ParDoTest implements Serializable {
StateSpecs.set(VarIntCoder.of());
@StateId(countStateId)
private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+ countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
@@ -2131,7 +2131,7 @@ public class ParDoTest implements Serializable {
@StateId(countStateId)
private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+ countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
@@ -2174,7 +2174,7 @@ public class ParDoTest implements Serializable {
@StateId(countStateId)
private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+ countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
@@ -2216,7 +2216,7 @@ public class ParDoTest implements Serializable {
StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
@StateId(countStateId)
private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+ countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
@@ -2262,7 +2262,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
@StateId(countStateId)
private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+ countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
@@ -2308,7 +2308,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
@StateId(countStateId)
private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(),
+ countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@ProcessElement
@@ -2354,7 +2354,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<
Object, CombiningState<Double, CountSum<Double>, Double>>
combiningState =
- StateSpecs.combiningValue(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
+ StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
@ProcessElement
public void processElement(
@@ -2394,7 +2394,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<
Object, CombiningState<Integer, MyInteger, Integer>>
combiningState =
- StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
+ StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
@Override
public MyInteger createAccumulator() {
return new MyInteger(0);
@@ -2456,7 +2456,7 @@ public class ParDoTest implements Serializable {
private final StateSpec<
Object, CombiningState<Integer, MyInteger, Integer>>
combiningState =
- StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() {
+ StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
@Override
public MyInteger createAccumulator() {
return new MyInteger(0);
[5/6] beam git commit: Rename AccumulatorCombiningState to
CombiningState
Posted by ke...@apache.org.
Rename AccumulatorCombiningState to CombiningState
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef480a37
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef480a37
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef480a37
Branch: refs/heads/master
Commit: ef480a37ebe039d0eaa2d4ca758ea015893e9089
Parents: 24c0495
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 3 11:27:26 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 11:57:21 2017 -0700
----------------------------------------------------------------------
.../translation/utils/ApexStateInternals.java | 28 ++++----
.../utils/ApexStateInternalsTest.java | 14 ++--
.../runners/core/InMemoryStateInternals.java | 36 +++++------
.../apache/beam/runners/core/NonEmptyPanes.java | 4 +-
.../beam/runners/core/SideInputHandler.java | 18 +++---
.../apache/beam/runners/core/StateMerging.java | 14 ++--
.../org/apache/beam/runners/core/StateTag.java | 14 ++--
.../org/apache/beam/runners/core/StateTags.java | 24 +++----
.../beam/runners/core/SystemReduceFn.java | 4 +-
.../AfterDelayFromFirstElementStateMachine.java | 8 +--
.../core/triggers/AfterPaneStateMachine.java | 4 +-
.../core/InMemoryStateInternalsTest.java | 14 ++--
.../CopyOnAccessInMemoryStateInternals.java | 46 +++++++------
.../CopyOnAccessInMemoryStateInternalsTest.java | 6 +-
.../state/FlinkBroadcastStateInternals.java | 68 ++++++++++----------
.../state/FlinkKeyGroupStateInternals.java | 16 ++---
.../state/FlinkSplitStateInternals.java | 16 ++---
.../streaming/state/FlinkStateInternals.java | 68 ++++++++++----------
.../FlinkBroadcastStateInternalsTest.java | 14 ++--
.../streaming/FlinkStateInternalsTest.java | 14 ++--
.../spark/stateful/SparkStateInternals.java | 30 ++++-----
.../beam/sdk/transforms/GroupIntoBatches.java | 10 +--
.../util/state/AccumulatorCombiningState.java | 53 ---------------
.../beam/sdk/util/state/CombiningState.java | 53 +++++++++++++++
.../apache/beam/sdk/util/state/StateBinder.java | 12 ++--
.../apache/beam/sdk/util/state/StateSpecs.java | 30 ++++-----
.../apache/beam/sdk/transforms/ParDoTest.java | 39 +++++------
27 files changed, 328 insertions(+), 329 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index 7634366..c59afc5 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -43,8 +43,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
@@ -139,12 +139,12 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
final CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new ApexAccumulatorCombiningState<>(
+ return new ApexCombiningState<>(
namespace,
address,
accumCoder,
@@ -161,12 +161,12 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new ApexAccumulatorCombiningState<>(
+ return new ApexCombiningState<>(
namespace,
address,
accumCoder,
@@ -174,9 +174,9 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
@@ -323,14 +323,14 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
}
- private final class ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+ private final class ApexCombiningState<K, InputT, AccumT, OutputT>
extends AbstractState<AccumT>
- implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+ implements CombiningState<InputT, AccumT, OutputT> {
private final K key;
private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
- private ApexAccumulatorCombiningState(StateNamespace namespace,
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ private ApexCombiningState(StateNamespace namespace,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> coder,
K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
super(namespace, address, coder);
@@ -339,7 +339,7 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
}
@Override
- public ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() {
+ public ApexCombiningState<K, InputT, AccumT, OutputT> readLater() {
return this;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index a1494ad..4f4ecfb 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.ValueState;
@@ -58,7 +58,7 @@ public class ApexStateInternalsTest {
private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
@@ -181,9 +181,9 @@ public class ApexStateInternalsTest {
@Test
public void testMergeCombiningValueIntoSource() throws Exception {
- AccumulatorCombiningState<Integer, int[], Integer> value1 =
+ CombiningState<Integer, int[], Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- AccumulatorCombiningState<Integer, int[], Integer> value2 =
+ CombiningState<Integer, int[], Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
value1.add(5);
@@ -202,11 +202,11 @@ public class ApexStateInternalsTest {
@Test
public void testMergeCombiningValueIntoNewNamespace() throws Exception {
- AccumulatorCombiningState<Integer, int[], Integer> value1 =
+ CombiningState<Integer, int[], Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- AccumulatorCombiningState<Integer, int[], Integer> value2 =
+ CombiningState<Integer, int[], Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
- AccumulatorCombiningState<Integer, int[], Integer> value3 =
+ CombiningState<Integer, int[], Integer> value3 =
underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
value1.add(5);
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index b4b2b38..0d5b058 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -38,8 +38,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
@@ -148,12 +148,12 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
final CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn());
+ return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn());
}
@Override
@@ -164,18 +164,18 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn);
+ return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn);
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
@@ -307,17 +307,17 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
/**
- * An {@link InMemoryState} implementation of {@link AccumulatorCombiningState}.
+ * An {@link InMemoryState} implementation of {@link CombiningState}.
*/
- public static final class InMemoryCombiningValue<K, InputT, AccumT, OutputT>
- implements AccumulatorCombiningState<InputT, AccumT, OutputT>,
- InMemoryState<InMemoryCombiningValue<K, InputT, AccumT, OutputT>> {
+ public static final class InMemoryCombiningState<K, InputT, AccumT, OutputT>
+ implements CombiningState<InputT, AccumT, OutputT>,
+ InMemoryState<InMemoryCombiningState<K, InputT, AccumT, OutputT>> {
private final K key;
private boolean isCleared = true;
private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
private AccumT accum;
- public InMemoryCombiningValue(
+ public InMemoryCombiningState(
K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
this.key = key;
this.combineFn = combineFn;
@@ -325,7 +325,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public InMemoryCombiningValue<K, InputT, AccumT, OutputT> readLater() {
+ public InMemoryCombiningState<K, InputT, AccumT, OutputT> readLater() {
return this;
}
@@ -384,9 +384,9 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public InMemoryCombiningValue<K, InputT, AccumT, OutputT> copy() {
- InMemoryCombiningValue<K, InputT, AccumT, OutputT> that =
- new InMemoryCombiningValue<>(key, combineFn);
+ public InMemoryCombiningState<K, InputT, AccumT, OutputT> copy() {
+ InMemoryCombiningState<K, InputT, AccumT, OutputT> that =
+ new InMemoryCombiningState<>(key, combineFn);
if (!this.isCleared) {
that.isCleared = this.isCleared;
that.addAccum(accum);
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
index aa033ce..3e875c2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.ReadableState;
/**
@@ -113,7 +113,7 @@ public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
extends NonEmptyPanes<K, W> {
- private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
+ private static final StateTag<Object, CombiningState<Long, long[], Long>>
PANE_ADDITIONS_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
"count", VarLongCoder.of(), Sum.ofLongs()));
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 24f326d..26e920a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.values.PCollectionView;
@@ -71,10 +71,10 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
PCollectionView<?>,
StateTag<
Object,
- AccumulatorCombiningState<
- BoundedWindow,
- Set<BoundedWindow>,
- Set<BoundedWindow>>>> availableWindowsTags;
+ CombiningState<
+ BoundedWindow,
+ Set<BoundedWindow>,
+ Set<BoundedWindow>>>> availableWindowsTags;
/**
* State tag for the actual contents of each side input per window.
@@ -106,10 +106,10 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
StateTag<
Object,
- AccumulatorCombiningState<
- BoundedWindow,
- Set<BoundedWindow>,
- Set<BoundedWindow>>> availableTag = StateTags.combiningValue(
+ CombiningState<
+ BoundedWindow,
+ Set<BoundedWindow>,
+ Set<BoundedWindow>>> availableTag = StateTags.combiningValue(
"side-input-available-windows-" + sideInput.getTagInternal().getId(),
SetCoder.of(windowCoder),
new WindowSetCombineFn());
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index 593d697..3410850 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -24,8 +24,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
@@ -172,7 +172,7 @@ public class StateMerging {
*/
public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(
MergingStateAccessor<K, W> context,
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address) {
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address) {
mergeCombiningValues(
context.accessInEachMergingWindow(address).values(), context.access(address));
}
@@ -182,8 +182,8 @@ public class StateMerging {
* {@code result}.
*/
public static <InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(
- Collection<AccumulatorCombiningState<InputT, AccumT, OutputT>> sources,
- AccumulatorCombiningState<InputT, AccumT, OutputT> result) {
+ Collection<CombiningState<InputT, AccumT, OutputT>> sources,
+ CombiningState<InputT, AccumT, OutputT> result) {
if (sources.isEmpty()) {
// Nothing to merge.
return;
@@ -194,18 +194,18 @@ public class StateMerging {
}
// Prefetch.
List<ReadableState<AccumT>> futures = new ArrayList<>(sources.size());
- for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
+ for (CombiningState<InputT, AccumT, OutputT> source : sources) {
prefetchRead(source);
}
// Read.
List<AccumT> accumulators = new ArrayList<>(futures.size());
- for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
+ for (CombiningState<InputT, AccumT, OutputT> source : sources) {
accumulators.add(source.getAccum());
}
// Merge (possibly update and return one of the existing accumulators).
AccumT merged = result.mergeAccumulators(accumulators);
// Clear sources.
- for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
+ for (CombiningState<InputT, AccumT, OutputT> source : sources) {
source.clear();
}
// Update result.
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index 802aede..12c59ad 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -28,8 +28,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.SetState;
import org.apache.beam.sdk.util.state.State;
@@ -94,20 +94,20 @@ public interface StateTag<K, StateT extends State> extends Serializable {
StateTag<? super K, MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
- <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
combineFn);
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 1c70dff..4893919 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -30,8 +30,8 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.SetState;
import org.apache.beam.sdk.util.state.State;
@@ -84,9 +84,9 @@ public class StateTags {
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
String id,
- StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, combineFn);
@@ -94,9 +94,9 @@ public class StateTags {
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
String id,
- StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
return binder.bindKeyedCombiningValue(tagForSpec(id, spec), accumCoder, combineFn);
@@ -104,9 +104,9 @@ public class StateTags {
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
String id,
- StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
return binder.bindKeyedCombiningValueWithContext(
@@ -158,7 +158,7 @@ public class StateTags {
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+ StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
combiningValue(
String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
@@ -170,7 +170,7 @@ public class StateTags {
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT,
- OutputT> StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+ OutputT> StateTag<K, CombiningState<InputT, AccumT, OutputT>>
keyedCombiningValue(String id, Coder<AccumT> accumCoder,
KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
@@ -182,7 +182,7 @@ public class StateTags {
* merge multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <K, InputT, AccumT, OutputT>
- StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+ StateTag<K, CombiningState<InputT, AccumT, OutputT>>
keyedCombiningValueWithContext(
String id,
Coder<AccumT> accumCoder,
@@ -199,7 +199,7 @@ public class StateTags {
* should only be used to initialize static values.
*/
public static <InputT, AccumT, OutputT>
- StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+ StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
combiningValueFromInputInternal(
String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
@@ -255,7 +255,7 @@ public class StateTags {
public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>>
convertToBagTagInternal(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningTag) {
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> combiningTag) {
return new SimpleStateTag<>(
new StructuredId(combiningTag.getId()),
StateSpecs.convertToBagSpecInternal(combiningTag.getSpec()));
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index 0f2f790..f618d88 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -25,8 +25,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
@@ -71,7 +71,7 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
AccumT, OutputT, W>
combining(
final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- final StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> bufferTag;
+ final StateTag<K, CombiningState<InputT, AccumT, OutputT>> bufferTag;
if (combineFn.getFn() instanceof KeyedCombineFnWithContext) {
bufferTag = StateTags.makeSystemTagInternal(
StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext(
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 29c29a7..b416788 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -30,11 +30,11 @@ import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.Holder;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -55,8 +55,8 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
protected static final List<SerializableFunction<Instant, Instant>> IDENTITY =
ImmutableList.<SerializableFunction<Instant, Instant>>of();
- protected static final StateTag<Object, AccumulatorCombiningState<Instant,
- Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
+ protected static final StateTag<Object, CombiningState<Instant,
+ Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
"delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index 1dd5b65..11323cc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -27,7 +27,7 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStat
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
/**
* {@link TriggerStateMachine}s that fire based on properties of the elements in the current pane.
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
@Experimental(Experimental.Kind.TRIGGER)
public class AfterPaneStateMachine extends OnceTriggerStateMachine {
-private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
+private static final StateTag<Object, CombiningState<Long, long[], Long>>
ELEMENTS_IN_PANE_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
"count", VarLongCoder.of(), Sum.ofLongs()));
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 5f90084..e4fb5c1 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
@@ -61,7 +61,7 @@ public class InMemoryStateInternalsTest {
private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
@@ -411,9 +411,9 @@ public class InMemoryStateInternalsTest {
@Test
public void testMergeCombiningValueIntoSource() throws Exception {
- AccumulatorCombiningState<Integer, int[], Integer> value1 =
+ CombiningState<Integer, int[], Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- AccumulatorCombiningState<Integer, int[], Integer> value2 =
+ CombiningState<Integer, int[], Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
value1.add(5);
@@ -432,11 +432,11 @@ public class InMemoryStateInternalsTest {
@Test
public void testMergeCombiningValueIntoNewNamespace() throws Exception {
- AccumulatorCombiningState<Integer, int[], Integer> value1 =
+ CombiningState<Integer, int[], Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- AccumulatorCombiningState<Integer, int[], Integer> value2 =
+ CombiningState<Integer, int[], Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
- AccumulatorCombiningState<Integer, int[], Integer> value3 =
+ CombiningState<Integer, int[], Integer> value3 =
underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
value1.add(5);
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index ff5c23c..0665812 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -26,7 +26,7 @@ import java.util.HashSet;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryBag;
-import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryCombiningValue;
+import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryCombiningState;
import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryMap;
import org.apache.beam.runners.core.InMemoryStateInternals.InMemorySet;
import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryState;
@@ -45,8 +45,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.SetState;
import org.apache.beam.sdk.util.state.State;
@@ -306,19 +306,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
- InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, OutputT>>
- existingState = (
- InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT,
- OutputT>>) underlying.get().get(namespace, address, c);
+ InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState =
+ (InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>>)
+ underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
- return new InMemoryCombiningValue<>(
+ return new InMemoryCombiningState<>(
key, combineFn.asKeyedFn());
}
}
@@ -367,27 +366,26 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
- InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT, OutputT>>
- existingState = (
- InMemoryState<? extends AccumulatorCombiningState<InputT, AccumT,
- OutputT>>) underlying.get().get(namespace, address, c);
+ InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState =
+ (InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>>)
+ underlying.get().get(namespace, address, c);
return existingState.copy();
} else {
- return new InMemoryCombiningValue<>(key, combineFn);
+ return new InMemoryCombiningState<>(key, combineFn);
}
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
return bindKeyedCombiningValue(
@@ -449,9 +447,9 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return underlying.get(namespace, address, c);
}
@@ -476,18 +474,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
return underlying.get(namespace, address, c);
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
return bindKeyedCombiningValue(
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 59c0a37..142af32 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -45,8 +45,8 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.SetState;
@@ -229,7 +229,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
StateNamespace namespace = new StateNamespaceForTest("foo");
CoderRegistry reg = pipeline.getCoderRegistry();
- StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag =
+ StateTag<Object, CombiningState<Long, long[], Long>> stateTag =
StateTags.combiningValue("summer",
sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
@@ -259,7 +259,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
StateNamespace namespace = new StateNamespaceForTest("foo");
CoderRegistry reg = pipeline.getCoderRegistry();
- StateTag<String, AccumulatorCombiningState<Long, long[], Long>> stateTag =
+ StateTag<String, CombiningState<Long, long[], Long>> stateTag =
StateTags.keyedCombiningValue(
"summer",
sumLongFn.getAccumulatorCoder(
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index bcc3660..3203446 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -37,8 +37,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
@@ -133,23 +133,23 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new FlinkAccumulatorCombiningState<>(
+ return new FlinkCombiningState<>(
stateBackend, address, combineFn, namespace, accumCoder);
}
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkKeyedAccumulatorCombiningState<>(
+ return new FlinkKeyedCombiningState<>(
stateBackend,
address,
combineFn,
@@ -160,12 +160,12 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineWithContext.KeyedCombineFnWithContext<
? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkAccumulatorCombiningStateWithContext<>(
+ return new FlinkCombiningStateWithContext<>(
stateBackend,
address,
combineFn,
@@ -464,17 +464,17 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
}
}
- private class FlinkAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+ private class FlinkCombiningState<K, InputT, AccumT, OutputT>
extends AbstractBroadcastState<AccumT>
- implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+ implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
- FlinkAccumulatorCombiningState(
+ FlinkCombiningState(
DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder) {
@@ -486,7 +486,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
}
@Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+ public CombiningState<InputT, AccumT, OutputT> readLater() {
return this;
}
@@ -566,8 +566,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
return false;
}
- FlinkAccumulatorCombiningState<?, ?, ?, ?> that =
- (FlinkAccumulatorCombiningState<?, ?, ?, ?>) o;
+ FlinkCombiningState<?, ?, ?, ?> that =
+ (FlinkCombiningState<?, ?, ?, ?>) o;
return namespace.equals(that.namespace) && address.equals(that.address);
@@ -581,18 +581,18 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
}
}
- private class FlinkKeyedAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+ private class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
extends AbstractBroadcastState<AccumT>
- implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+ implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
private final FlinkBroadcastStateInternals<K> flinkStateInternals;
- FlinkKeyedAccumulatorCombiningState(
+ FlinkKeyedCombiningState(
DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
@@ -607,7 +607,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
}
@Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+ public CombiningState<InputT, AccumT, OutputT> readLater() {
return this;
}
@@ -706,8 +706,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
return false;
}
- FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?> that =
- (FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?>) o;
+ FlinkKeyedCombiningState<?, ?, ?, ?> that =
+ (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
return namespace.equals(that.namespace) && address.equals(that.address);
@@ -721,20 +721,20 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
}
}
- private class FlinkAccumulatorCombiningStateWithContext<K, InputT, AccumT, OutputT>
+ private class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
extends AbstractBroadcastState<AccumT>
- implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+ implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
private final CombineWithContext.KeyedCombineFnWithContext<
? super K, InputT, AccumT, OutputT> combineFn;
private final FlinkBroadcastStateInternals<K> flinkStateInternals;
private final CombineWithContext.Context context;
- FlinkAccumulatorCombiningStateWithContext(
+ FlinkCombiningStateWithContext(
DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
CombineWithContext.KeyedCombineFnWithContext<
? super K, InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
@@ -752,7 +752,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
}
@Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+ public CombiningState<InputT, AccumT, OutputT> readLater() {
return this;
}
@@ -847,8 +847,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
return false;
}
- FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?> that =
- (FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?>) o;
+ FlinkCombiningStateWithContext<?, ?, ?, ?> that =
+ (FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
return namespace.equals(that.namespace) && address.equals(that.address);
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index a29b1b2..24b340e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
@@ -156,9 +156,9 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
throw new UnsupportedOperationException("bindCombiningValue is not supported.");
@@ -166,8 +166,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported.");
@@ -176,8 +176,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineWithContext.KeyedCombineFnWithContext<
? super K, InputT, AccumT, OutputT> combineFn) {
@@ -190,7 +190,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
StateTag<? super K, WatermarkHoldState<W>> address,
OutputTimeFn<? super W> outputTimeFn) {
throw new UnsupportedOperationException(
- String.format("%s is not supported", AccumulatorCombiningState.class.getSimpleName()));
+ String.format("%s is not supported", CombiningState.class.getSimpleName()));
}
});
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index d9e87d1..2bf0bf1 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -28,8 +28,8 @@ import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
@@ -116,9 +116,9 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
throw new UnsupportedOperationException("bindCombiningValue is not supported.");
@@ -126,8 +126,8 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
throw new UnsupportedOperationException("bindKeyedCombiningValue is not supported.");
@@ -136,8 +136,8 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineWithContext.KeyedCombineFnWithContext<
? super K, InputT, AccumT, OutputT> combineFn) {
@@ -150,7 +150,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
StateTag<? super K, WatermarkHoldState<W>> address,
OutputTimeFn<? super W> outputTimeFn) {
throw new UnsupportedOperationException(
- String.format("%s is not supported", AccumulatorCombiningState.class.getSimpleName()));
+ String.format("%s is not supported", CombiningState.class.getSimpleName()));
}
});
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index 9033ba7..4f961e5 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
@@ -142,23 +142,23 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new FlinkAccumulatorCombiningState<>(
+ return new FlinkCombiningState<>(
flinkStateBackend, address, combineFn, namespace, accumCoder);
}
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkKeyedAccumulatorCombiningState<>(
+ return new FlinkKeyedCombiningState<>(
flinkStateBackend,
address,
combineFn,
@@ -169,12 +169,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
- AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineWithContext.KeyedCombineFnWithContext<
? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkAccumulatorCombiningStateWithContext<>(
+ return new FlinkCombiningStateWithContext<>(
flinkStateBackend,
address,
combineFn,
@@ -393,18 +393,18 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
}
- private static class FlinkAccumulatorCombiningState<K, InputT, AccumT, OutputT>
- implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+ private static class FlinkCombiningState<K, InputT, AccumT, OutputT>
+ implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
- FlinkAccumulatorCombiningState(
+ FlinkCombiningState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder) {
@@ -420,7 +420,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
@Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+ public CombiningState<InputT, AccumT, OutputT> readLater() {
return this;
}
@@ -546,8 +546,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
return false;
}
- FlinkAccumulatorCombiningState<?, ?, ?, ?> that =
- (FlinkAccumulatorCombiningState<?, ?, ?, ?>) o;
+ FlinkCombiningState<?, ?, ?, ?> that =
+ (FlinkCombiningState<?, ?, ?, ?>) o;
return namespace.equals(that.namespace) && address.equals(that.address);
@@ -561,19 +561,19 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
}
- private static class FlinkKeyedAccumulatorCombiningState<K, InputT, AccumT, OutputT>
- implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+ private static class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
+ implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
private final FlinkStateInternals<K> flinkStateInternals;
- FlinkKeyedAccumulatorCombiningState(
+ FlinkKeyedCombiningState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
@@ -591,7 +591,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
@Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+ public CombiningState<InputT, AccumT, OutputT> readLater() {
return this;
}
@@ -721,8 +721,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
return false;
}
- FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?> that =
- (FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?>) o;
+ FlinkKeyedCombiningState<?, ?, ?, ?> that =
+ (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
return namespace.equals(that.namespace) && address.equals(that.address);
@@ -736,11 +736,11 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
}
- private static class FlinkAccumulatorCombiningStateWithContext<K, InputT, AccumT, OutputT>
- implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+ private static class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
+ implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
private final CombineWithContext.KeyedCombineFnWithContext<
? super K, InputT, AccumT, OutputT> combineFn;
private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
@@ -748,9 +748,9 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
private final FlinkStateInternals<K> flinkStateInternals;
private final CombineWithContext.Context context;
- FlinkAccumulatorCombiningStateWithContext(
+ FlinkCombiningStateWithContext(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
CombineWithContext.KeyedCombineFnWithContext<
? super K, InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
@@ -771,7 +771,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
@Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+ public CombiningState<InputT, AccumT, OutputT> readLater() {
return this;
}
@@ -896,8 +896,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
return false;
}
- FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?> that =
- (FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?>) o;
+ FlinkCombiningStateWithContext<?, ?, ?, ?> that =
+ (FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
return namespace.equals(that.namespace) && address.equals(that.address);
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index f4e3ea8..7e7d1e1 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -32,8 +32,8 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkB
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.ValueState;
@@ -58,7 +58,7 @@ public class FlinkBroadcastStateInternalsTest {
private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
@@ -202,9 +202,9 @@ public class FlinkBroadcastStateInternalsTest {
@Test
public void testMergeCombiningValueIntoSource() throws Exception {
- AccumulatorCombiningState<Integer, int[], Integer> value1 =
+ CombiningState<Integer, int[], Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- AccumulatorCombiningState<Integer, int[], Integer> value2 =
+ CombiningState<Integer, int[], Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
value1.add(5);
@@ -223,11 +223,11 @@ public class FlinkBroadcastStateInternalsTest {
@Test
public void testMergeCombiningValueIntoNewNamespace() throws Exception {
- AccumulatorCombiningState<Integer, int[], Integer> value1 =
+ CombiningState<Integer, int[], Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- AccumulatorCombiningState<Integer, int[], Integer> value2 =
+ CombiningState<Integer, int[], Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
- AccumulatorCombiningState<Integer, int[], Integer> value3 =
+ CombiningState<Integer, int[], Integer> value3 =
underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
value1.add(5);
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 27747dd..d140271 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -37,8 +37,8 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.ValueState;
@@ -72,7 +72,7 @@ public class FlinkStateInternalsTest {
private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+ private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
@@ -232,9 +232,9 @@ public class FlinkStateInternalsTest {
@Test
public void testMergeCombiningValueIntoSource() throws Exception {
- AccumulatorCombiningState<Integer, int[], Integer> value1 =
+ CombiningState<Integer, int[], Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- AccumulatorCombiningState<Integer, int[], Integer> value2 =
+ CombiningState<Integer, int[], Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
value1.add(5);
@@ -253,11 +253,11 @@ public class FlinkStateInternalsTest {
@Test
public void testMergeCombiningValueIntoNewNamespace() throws Exception {
- AccumulatorCombiningState<Integer, int[], Integer> value1 =
+ CombiningState<Integer, int[], Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- AccumulatorCombiningState<Integer, int[], Integer> value2 =
+ CombiningState<Integer, int[], Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
- AccumulatorCombiningState<Integer, int[], Integer> value3 =
+ CombiningState<Integer, int[], Integer> value3 =
underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
value1.add(5);
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index 43fb383..725e9d3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
@@ -137,31 +137,31 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new SparkAccumulatorCombiningState<>(namespace, address, accumCoder, key,
+ return new SparkCombiningState<>(namespace, address, accumCoder, key,
combineFn.<K>asKeyedFn());
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new SparkAccumulatorCombiningState<>(namespace, address, accumCoder, key, combineFn);
+ return new SparkCombiningState<>(namespace, address, accumCoder, key, combineFn);
}
@Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return new SparkAccumulatorCombiningState<>(namespace, address, accumCoder, key,
+ return new SparkCombiningState<>(namespace, address, accumCoder, key,
CombineFnUtil.bindContext(combineFn, c));
}
@@ -300,16 +300,16 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
}
- private class SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+ private class SparkCombiningState<K, InputT, AccumT, OutputT>
extends AbstractState<AccumT>
- implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+ implements CombiningState<InputT, AccumT, OutputT> {
private final K key;
private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
- private SparkAccumulatorCombiningState(
+ private SparkCombiningState(
StateNamespace namespace,
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> coder,
K key,
KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
@@ -319,7 +319,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
@Override
- public SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() {
+ public SparkCombiningState<K, InputT, AccumT, OutputT> readLater() {
return this;
}
[6/6] beam git commit: This closes #2413: Rename CombiningState and
friends to be more user-friendly
Posted by ke...@apache.org.
This closes #2413: Rename CombiningState and friends to be more user-friendly
Upgrade Dataflow worker image to beam-master-20170405
Rename combiningValue to combining in StateSpecs
Rename AccumulatorCombiningState to CombiningState
Rename CombiningState to GroupingState
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b92032ff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b92032ff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b92032ff
Branch: refs/heads/master
Commit: b92032ff6d3fba7d29dd73f602977137b7539482
Parents: bb18384 359040e
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 6 15:33:28 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 15:33:28 2017 -0700
----------------------------------------------------------------------
.../translation/utils/ApexStateInternals.java | 28 ++---
.../utils/ApexStateInternalsTest.java | 18 +--
.../runners/core/InMemoryStateInternals.java | 36 +++---
.../apache/beam/runners/core/NonEmptyPanes.java | 4 +-
.../beam/runners/core/SideInputHandler.java | 18 +--
.../apache/beam/runners/core/StateMerging.java | 16 +--
.../org/apache/beam/runners/core/StateTag.java | 14 +--
.../org/apache/beam/runners/core/StateTags.java | 32 +++---
.../beam/runners/core/SystemReduceFn.java | 8 +-
.../AfterDelayFromFirstElementStateMachine.java | 10 +-
.../core/triggers/AfterPaneStateMachine.java | 4 +-
.../core/InMemoryStateInternalsTest.java | 18 +--
.../CopyOnAccessInMemoryStateInternals.java | 46 ++++----
.../CopyOnAccessInMemoryStateInternalsTest.java | 18 +--
.../state/FlinkBroadcastStateInternals.java | 68 ++++++------
.../state/FlinkKeyGroupStateInternals.java | 16 +--
.../state/FlinkSplitStateInternals.java | 16 +--
.../streaming/state/FlinkStateInternals.java | 68 ++++++------
.../FlinkBroadcastStateInternalsTest.java | 18 +--
.../streaming/FlinkStateInternalsTest.java | 18 +--
runners/google-cloud-dataflow-java/pom.xml | 2 +-
.../spark/stateful/SparkStateInternals.java | 30 ++---
.../src/main/resources/beam/findbugs-filter.xml | 2 +-
.../beam/sdk/transforms/GroupIntoBatches.java | 12 +-
.../util/state/AccumulatorCombiningState.java | 53 ---------
.../apache/beam/sdk/util/state/BagState.java | 2 +-
.../beam/sdk/util/state/CombiningState.java | 27 +++--
.../beam/sdk/util/state/GroupingState.java | 42 +++++++
.../apache/beam/sdk/util/state/SetState.java | 2 +-
.../org/apache/beam/sdk/util/state/State.java | 2 +-
.../apache/beam/sdk/util/state/StateBinder.java | 12 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 110 +++++++++----------
.../beam/sdk/util/state/WatermarkHoldState.java | 2 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 59 +++++-----
34 files changed, 415 insertions(+), 416 deletions(-)
----------------------------------------------------------------------
[3/6] beam git commit: Rename CombiningState to GroupingState
Posted by ke...@apache.org.
Rename CombiningState to GroupingState
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/24c0495a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/24c0495a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/24c0495a
Branch: refs/heads/master
Commit: 24c0495a22dec9b7c44942794831b284f8caf78c
Parents: 0a17645
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 3 11:26:30 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 11:57:21 2017 -0700
----------------------------------------------------------------------
.../utils/ApexStateInternalsTest.java | 6 +--
.../apache/beam/runners/core/StateMerging.java | 4 +-
.../beam/runners/core/SystemReduceFn.java | 6 +--
.../AfterDelayFromFirstElementStateMachine.java | 4 +-
.../core/InMemoryStateInternalsTest.java | 6 +--
.../CopyOnAccessInMemoryStateInternalsTest.java | 14 +++----
.../FlinkBroadcastStateInternalsTest.java | 6 +--
.../streaming/FlinkStateInternalsTest.java | 6 +--
.../util/state/AccumulatorCombiningState.java | 4 +-
.../apache/beam/sdk/util/state/BagState.java | 2 +-
.../beam/sdk/util/state/CombiningState.java | 42 --------------------
.../beam/sdk/util/state/GroupingState.java | 42 ++++++++++++++++++++
.../apache/beam/sdk/util/state/SetState.java | 2 +-
.../org/apache/beam/sdk/util/state/State.java | 2 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 2 +-
.../beam/sdk/util/state/WatermarkHoldState.java | 2 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 2 +-
17 files changed, 76 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 3e83a7f..a1494ad 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
@@ -148,7 +148,7 @@ public class ApexStateInternalsTest {
@Test
public void testCombiningValue() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -168,7 +168,7 @@ public class ApexStateInternalsTest {
@Test
public void testCombiningIsEmpty() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index e98d098..593d697 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -26,7 +26,7 @@ import java.util.Map;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
import org.apache.beam.sdk.util.state.State;
@@ -159,7 +159,7 @@ public class StateMerging {
* Prefetch all combining value state for {@code address} across all merging windows in {@code
* context}.
*/
- public static <K, StateT extends CombiningState<?, ?>, W extends BoundedWindow> void
+ public static <K, StateT extends GroupingState<?, ?>, W extends BoundedWindow> void
prefetchCombiningValues(MergingStateAccessor<K, W> context,
StateTag<? super K, StateT> address) {
for (StateT state : context.accessInEachMergingWindow(address).values()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index bb7e4a9..0f2f790 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
/**
@@ -97,10 +97,10 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
};
}
- private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag;
+ private StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag;
public SystemReduceFn(
- StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag) {
+ StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag) {
this.bufferTag = bufferTag;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 4444c22..29c29a7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormat;
@@ -169,7 +169,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
@Override
public void onElement(OnElementContext c) throws Exception {
- CombiningState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
+ GroupingState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
Instant oldDelayUntil = delayUntilState.read();
// Since processing time can only advance, resulting in target wake-up times we would
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 1da946f..5f90084 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
@@ -378,7 +378,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testCombiningValue() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -398,7 +398,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testCombiningIsEmpty() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index c7409bb..59c0a37 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -47,7 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.SetState;
import org.apache.beam.sdk.util.state.ValueState;
@@ -232,7 +232,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag =
StateTags.combiningValue("summer",
sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
- CombiningState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
+ GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(0L));
underlyingValue.add(1L);
@@ -240,14 +240,14 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- CombiningState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
+ GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
assertThat(copyOnAccessState.read(), equalTo(1L));
copyOnAccessState.add(4L);
assertThat(copyOnAccessState.read(), equalTo(5L));
assertThat(underlyingValue.read(), equalTo(1L));
- CombiningState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
+ GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
}
@@ -265,7 +265,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
sumLongFn.getAccumulatorCoder(
reg, StringUtf8Coder.of(), reg.getDefaultCoder(Long.class)),
sumLongFn);
- CombiningState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
+ GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(0L));
underlyingValue.add(1L);
@@ -273,14 +273,14 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- CombiningState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
+ GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
assertThat(copyOnAccessState.read(), equalTo(1L));
copyOnAccessState.add(4L);
assertThat(copyOnAccessState.read(), equalTo(5L));
assertThat(underlyingValue.read(), equalTo(1L));
- CombiningState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
+ GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index db02cb3..f4e3ea8 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -169,7 +169,7 @@ public class FlinkBroadcastStateInternalsTest {
@Test
public void testCombiningValue() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -189,7 +189,7 @@ public class FlinkBroadcastStateInternalsTest {
@Test
public void testCombiningIsEmpty() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 7839cf3..27747dd 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -39,7 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
@@ -199,7 +199,7 @@ public class FlinkStateInternalsTest {
@Test
public void testCombiningValue() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -219,7 +219,7 @@ public class FlinkStateInternalsTest {
@Test
public void testCombiningIsEmpty() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
index 8dd1678..6b120f9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
@@ -21,14 +21,14 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
/**
* State for a single value that is managed by a {@link CombineFn}. This is an internal extension
- * to {@link CombiningState} that includes the {@code AccumT} type.
+ * to {@link GroupingState} that includes the {@code AccumT} type.
*
* @param <InputT> the type of values added to the state
* @param <AccumT> the type of accumulator
* @param <OutputT> the type of value extracted from the state
*/
public interface AccumulatorCombiningState<InputT, AccumT, OutputT>
- extends CombiningState<InputT, OutputT> {
+ extends GroupingState<InputT, OutputT> {
/**
* Read the merged accumulator for this combining value. It is implied that reading the
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
index c7e6d13..e0eebe5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
@@ -22,7 +22,7 @@ package org.apache.beam.sdk.util.state;
*
* @param <T> The type of elements in the bag.
*/
-public interface BagState<T> extends CombiningState<T, Iterable<T>> {
+public interface BagState<T> extends GroupingState<T, Iterable<T>> {
@Override
BagState<T> readLater();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
deleted file mode 100644
index 1155262..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
- * {@code OutputT} value.
- *
- * @param <InputT> the type of values added to the state
- * @param <OutputT> the type of value extracted from the state
- */
-public interface CombiningState<InputT, OutputT> extends ReadableState<OutputT>, State {
- /**
- * Add a value to the buffer.
- */
- void add(InputT value);
-
- /**
- * Return true if this state is empty.
- */
- ReadableState<Boolean> isEmpty();
-
- @Override
- CombiningState<InputT, OutputT> readLater();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
new file mode 100644
index 0000000..bd7a8d9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
+ * {@code OutputT} value.
+ *
+ * @param <InputT> the type of values added to the state
+ * @param <OutputT> the type of value extracted from the state
+ */
+public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State {
+ /**
+ * Add a value to the buffer.
+ */
+ void add(InputT value);
+
+ /**
+ * Return true if this state is empty.
+ */
+ ReadableState<Boolean> isEmpty();
+
+ @Override
+ GroupingState<InputT, OutputT> readLater();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
index 93058b2..5c907d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
@@ -23,7 +23,7 @@ package org.apache.beam.sdk.util.state;
*
* @param <T> The type of elements in the set.
*/
-public interface SetState<T> extends CombiningState<T, Iterable<T>> {
+public interface SetState<T> extends GroupingState<T, Iterable<T>> {
/**
* Returns true if this set contains the specified element.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
index 973cb9c..3a49f01 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.util.state;
* Base interface for all state locations.
*
* <p>Specific types of state add appropriate accessors for reading and writing values, see
- * {@link ValueState}, {@link BagState}, and {@link CombiningState}.
+ * {@link ValueState}, {@link BagState}, and {@link GroupingState}.
*/
public interface State {
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index 6a8c80b..db4b7de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -445,7 +445,7 @@ public class StateSpecs {
@Override public void finishSpecifying() {
if (getAccumCoder() == null) {
- throw new IllegalStateException("Unable to infer a coder for CombiningState and no"
+ throw new IllegalStateException("Unable to infer a coder for GroupingState and no"
+ " Coder was specified. Please set a coder by either invoking"
+ " StateSpecs.combiningValue(Coder<AccumT> accumCoder,"
+ " CombineFn<InputT, AccumT, OutputT> combineFn)"
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
index 415cc6e..20fa05f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
@@ -31,7 +31,7 @@ import org.joda.time.Instant;
*/
@Experimental(Kind.STATE)
public interface WatermarkHoldState<W extends BoundedWindow>
- extends CombiningState<Instant, Instant> {
+ extends GroupingState<Instant, Instant> {
/**
* Return the {@link OutputTimeFn} which will be used to determine a watermark hold time given
* an element timestamp, and to combine watermarks from windows which are about to be merged.
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 4249a77..cc67ac2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2495,7 +2495,7 @@ public class ParDoTest implements Serializable {
};
thrown.expect(RuntimeException.class);
- thrown.expectMessage("Unable to infer a coder for CombiningState and no Coder was specified.");
+ thrown.expectMessage("Unable to infer a coder for GroupingState and no Coder was specified.");
pipeline
.apply(Create.of(KV.of("hello", 3), KV.of("hello", 6), KV.of("hello", 7)))