You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/06 22:34:14 UTC
[3/6] beam git commit: Rename CombiningState to GroupingState
Rename CombiningState to GroupingState
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/24c0495a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/24c0495a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/24c0495a
Branch: refs/heads/master
Commit: 24c0495a22dec9b7c44942794831b284f8caf78c
Parents: 0a17645
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Apr 3 11:26:30 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 6 11:57:21 2017 -0700
----------------------------------------------------------------------
.../utils/ApexStateInternalsTest.java | 6 +--
.../apache/beam/runners/core/StateMerging.java | 4 +-
.../beam/runners/core/SystemReduceFn.java | 6 +--
.../AfterDelayFromFirstElementStateMachine.java | 4 +-
.../core/InMemoryStateInternalsTest.java | 6 +--
.../CopyOnAccessInMemoryStateInternalsTest.java | 14 +++----
.../FlinkBroadcastStateInternalsTest.java | 6 +--
.../streaming/FlinkStateInternalsTest.java | 6 +--
.../util/state/AccumulatorCombiningState.java | 4 +-
.../apache/beam/sdk/util/state/BagState.java | 2 +-
.../beam/sdk/util/state/CombiningState.java | 42 --------------------
.../beam/sdk/util/state/GroupingState.java | 42 ++++++++++++++++++++
.../apache/beam/sdk/util/state/SetState.java | 2 +-
.../org/apache/beam/sdk/util/state/State.java | 2 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 2 +-
.../beam/sdk/util/state/WatermarkHoldState.java | 2 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 2 +-
17 files changed, 76 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 3e83a7f..a1494ad 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
@@ -148,7 +148,7 @@ public class ApexStateInternalsTest {
@Test
public void testCombiningValue() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -168,7 +168,7 @@ public class ApexStateInternalsTest {
@Test
public void testCombiningIsEmpty() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index e98d098..593d697 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -26,7 +26,7 @@ import java.util.Map;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
import org.apache.beam.sdk.util.state.State;
@@ -159,7 +159,7 @@ public class StateMerging {
* Prefetch all combining value state for {@code address} across all merging windows in {@code
* context}.
*/
- public static <K, StateT extends CombiningState<?, ?>, W extends BoundedWindow> void
+ public static <K, StateT extends GroupingState<?, ?>, W extends BoundedWindow> void
prefetchCombiningValues(MergingStateAccessor<K, W> context,
StateTag<? super K, StateT> address) {
for (StateT state : context.accessInEachMergingWindow(address).values()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index bb7e4a9..0f2f790 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
/**
@@ -97,10 +97,10 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
};
}
- private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag;
+ private StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag;
public SystemReduceFn(
- StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag) {
+ StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag) {
this.bufferTag = bufferTag;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 4444c22..29c29a7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormat;
@@ -169,7 +169,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
@Override
public void onElement(OnElementContext c) throws Exception {
- CombiningState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
+ GroupingState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
Instant oldDelayUntil = delayUntilState.read();
// Since processing time can only advance, resulting in target wake-up times we would
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 1da946f..5f90084 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
@@ -378,7 +378,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testCombiningValue() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -398,7 +398,7 @@ public class InMemoryStateInternalsTest {
@Test
public void testCombiningIsEmpty() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index c7409bb..59c0a37 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -47,7 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.SetState;
import org.apache.beam.sdk.util.state.ValueState;
@@ -232,7 +232,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag =
StateTags.combiningValue("summer",
sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
- CombiningState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
+ GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(0L));
underlyingValue.add(1L);
@@ -240,14 +240,14 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- CombiningState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
+ GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
assertThat(copyOnAccessState.read(), equalTo(1L));
copyOnAccessState.add(4L);
assertThat(copyOnAccessState.read(), equalTo(5L));
assertThat(underlyingValue.read(), equalTo(1L));
- CombiningState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
+ GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
}
@@ -265,7 +265,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
sumLongFn.getAccumulatorCoder(
reg, StringUtf8Coder.of(), reg.getDefaultCoder(Long.class)),
sumLongFn);
- CombiningState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
+ GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(0L));
underlyingValue.add(1L);
@@ -273,14 +273,14 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
- CombiningState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
+ GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
assertThat(copyOnAccessState.read(), equalTo(1L));
copyOnAccessState.add(4L);
assertThat(copyOnAccessState.read(), equalTo(5L));
assertThat(underlyingValue.read(), equalTo(1L));
- CombiningState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
+ GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index db02cb3..f4e3ea8 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -169,7 +169,7 @@ public class FlinkBroadcastStateInternalsTest {
@Test
public void testCombiningValue() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -189,7 +189,7 @@ public class FlinkBroadcastStateInternalsTest {
@Test
public void testCombiningIsEmpty() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 7839cf3..27747dd 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -39,7 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
@@ -199,7 +199,7 @@ public class FlinkStateInternalsTest {
@Test
public void testCombiningValue() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
@@ -219,7 +219,7 @@ public class FlinkStateInternalsTest {
@Test
public void testCombiningIsEmpty() throws Exception {
- CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
index 8dd1678..6b120f9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java
@@ -21,14 +21,14 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
/**
* State for a single value that is managed by a {@link CombineFn}. This is an internal extension
- * to {@link CombiningState} that includes the {@code AccumT} type.
+ * to {@link GroupingState} that includes the {@code AccumT} type.
*
* @param <InputT> the type of values added to the state
* @param <AccumT> the type of accumulator
* @param <OutputT> the type of value extracted from the state
*/
public interface AccumulatorCombiningState<InputT, AccumT, OutputT>
- extends CombiningState<InputT, OutputT> {
+ extends GroupingState<InputT, OutputT> {
/**
* Read the merged accumulator for this combining value. It is implied that reading the
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
index c7e6d13..e0eebe5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java
@@ -22,7 +22,7 @@ package org.apache.beam.sdk.util.state;
*
* @param <T> The type of elements in the bag.
*/
-public interface BagState<T> extends CombiningState<T, Iterable<T>> {
+public interface BagState<T> extends GroupingState<T, Iterable<T>> {
@Override
BagState<T> readLater();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
deleted file mode 100644
index 1155262..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
- * {@code OutputT} value.
- *
- * @param <InputT> the type of values added to the state
- * @param <OutputT> the type of value extracted from the state
- */
-public interface CombiningState<InputT, OutputT> extends ReadableState<OutputT>, State {
- /**
- * Add a value to the buffer.
- */
- void add(InputT value);
-
- /**
- * Return true if this state is empty.
- */
- ReadableState<Boolean> isEmpty();
-
- @Override
- CombiningState<InputT, OutputT> readLater();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
new file mode 100644
index 0000000..bd7a8d9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single
+ * {@code OutputT} value.
+ *
+ * @param <InputT> the type of values added to the state
+ * @param <OutputT> the type of value extracted from the state
+ */
+public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State {
+ /**
+ * Add a value to the buffer.
+ */
+ void add(InputT value);
+
+ /**
+ * Return true if this state is empty.
+ */
+ ReadableState<Boolean> isEmpty();
+
+ @Override
+ GroupingState<InputT, OutputT> readLater();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
index 93058b2..5c907d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java
@@ -23,7 +23,7 @@ package org.apache.beam.sdk.util.state;
*
* @param <T> The type of elements in the set.
*/
-public interface SetState<T> extends CombiningState<T, Iterable<T>> {
+public interface SetState<T> extends GroupingState<T, Iterable<T>> {
/**
* Returns true if this set contains the specified element.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
index 973cb9c..3a49f01 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.util.state;
* Base interface for all state locations.
*
* <p>Specific types of state add appropriate accessors for reading and writing values, see
- * {@link ValueState}, {@link BagState}, and {@link CombiningState}.
+ * {@link ValueState}, {@link BagState}, and {@link GroupingState}.
*/
public interface State {
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index 6a8c80b..db4b7de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -445,7 +445,7 @@ public class StateSpecs {
@Override public void finishSpecifying() {
if (getAccumCoder() == null) {
- throw new IllegalStateException("Unable to infer a coder for CombiningState and no"
+ throw new IllegalStateException("Unable to infer a coder for GroupingState and no"
+ " Coder was specified. Please set a coder by either invoking"
+ " StateSpecs.combiningValue(Coder<AccumT> accumCoder,"
+ " CombineFn<InputT, AccumT, OutputT> combineFn)"
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
index 415cc6e..20fa05f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
@@ -31,7 +31,7 @@ import org.joda.time.Instant;
*/
@Experimental(Kind.STATE)
public interface WatermarkHoldState<W extends BoundedWindow>
- extends CombiningState<Instant, Instant> {
+ extends GroupingState<Instant, Instant> {
/**
* Return the {@link OutputTimeFn} which will be used to determine a watermark hold time given
* an element timestamp, and to combine watermarks from windows which are about to be merged.
http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 4249a77..cc67ac2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2495,7 +2495,7 @@ public class ParDoTest implements Serializable {
};
thrown.expect(RuntimeException.class);
- thrown.expectMessage("Unable to infer a coder for CombiningState and no Coder was specified.");
+ thrown.expectMessage("Unable to infer a coder for GroupingState and no Coder was specified.");
pipeline
.apply(Create.of(KV.of("hello", 3), KV.of("hello", 6), KV.of("hello", 7)))