You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/02 18:56:19 UTC
[1/6] beam git commit: Make DoFnSignatures robust to StateSpec
subclasses
Repository: beam
Updated Branches:
refs/heads/master eec903fb5 -> b40b26501
Make DoFnSignatures robust to StateSpec subclasses
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/190422ca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/190422ca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/190422ca
Branch: refs/heads/master
Commit: 190422cac34732ad722d58d2fecf03571f5f08e9
Parents: 8fe59c3
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 2 11:54:07 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 2 11:55:18 2017 -0700
----------------------------------------------------------------------
.../sdk/transforms/reflect/DoFnSignatures.java | 25 +++++++++++++++-----
1 file changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/190422ca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 7beeeb8..666c7f6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -1188,7 +1188,8 @@ public class DoFnSignatures {
}
Class<?> stateSpecRawType = field.getType();
- if (!(stateSpecRawType.equals(StateSpec.class))) {
+ if (!(TypeDescriptor.of(stateSpecRawType)
+ .isSubtypeOf(TypeDescriptor.of(StateSpec.class)))) {
errors.throwIllegalArgument(
"%s annotation on non-%s field [%s] that has class %s",
DoFn.StateId.class.getSimpleName(),
@@ -1208,14 +1209,26 @@ public class DoFnSignatures {
Type stateSpecType = field.getGenericType();
+ // A type descriptor for whatever type the @StateId-annotated class has, which
+ // must be some subtype of StateSpec
+ TypeDescriptor<? extends StateSpec<?>> stateSpecSubclassTypeDescriptor =
+ (TypeDescriptor) TypeDescriptor.of(stateSpecType);
+
+ // A type descriptor for StateSpec, with the generic type parameters filled
+ // in according to the specialization of the subclass (or just straight params)
+ TypeDescriptor<StateSpec<?>> stateSpecTypeDescriptor =
+ (TypeDescriptor)
+ stateSpecSubclassTypeDescriptor.getSupertype(StateSpec.class);
+
+ // The type of the state, which may still have free type variables from the
+ // context
+ Type unresolvedStateType =
+ ((ParameterizedType) stateSpecTypeDescriptor.getType()).getActualTypeArguments()[0];
+
// By static typing this is already a well-formed State subclass
TypeDescriptor<? extends State> stateType =
(TypeDescriptor<? extends State>)
- TypeDescriptor.of(fnClazz)
- .resolveType(
- TypeDescriptor.of(
- ((ParameterizedType) stateSpecType).getActualTypeArguments()[1])
- .getType());
+ TypeDescriptor.of(fnClazz).resolveType(unresolvedStateType);
declarations.put(id, DoFnSignature.StateDeclaration.create(id, field, stateType));
}
[5/6] beam git commit: Simplify type parameters of StateSpec and
related
Posted by ke...@apache.org.
Simplify type parameters of StateSpec and related
Before this change, almost all uses of state had a type variable that
existing only to support the esoteric use of a KeyedCombineFn in
a state cell. KeyedCombineFn is now gone, so the key is no longer
required.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8fe59c35
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8fe59c35
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8fe59c35
Branch: refs/heads/master
Commit: 8fe59c3555e65c21c0a0bbaa5a0ba9eac39316dd
Parents: eec903f
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 20 20:46:37 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 2 11:55:18 2017 -0700
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 4 +-
.../translation/utils/ApexStateInternals.java | 42 +++--
.../apex/translation/utils/NoOpStepContext.java | 2 +-
.../translation/utils/StateInternalsProxy.java | 6 +-
.../utils/ApexStateInternalsTest.java | 12 +-
.../construction/PTransformMatchersTest.java | 2 +-
.../beam/runners/core/BaseExecutionContext.java | 2 +-
.../beam/runners/core/ExecutionContext.java | 2 +-
.../GroupAlsoByWindowViaOutputBufferDoFn.java | 2 +-
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 2 +-
.../GroupAlsoByWindowViaWindowSetNewDoFn.java | 2 +-
.../runners/core/InMemoryStateInternals.java | 36 ++--
.../runners/core/MergingActiveWindowSet.java | 4 +-
.../beam/runners/core/MergingStateAccessor.java | 2 +-
.../apache/beam/runners/core/NonEmptyPanes.java | 2 +-
.../beam/runners/core/PaneInfoTracker.java | 2 +-
.../runners/core/ReduceFnContextFactory.java | 37 ++--
.../beam/runners/core/ReduceFnRunner.java | 4 +-
.../beam/runners/core/SideInputHandler.java | 14 +-
.../beam/runners/core/SimpleDoFnRunner.java | 8 +-
.../beam/runners/core/SimpleOldDoFnRunner.java | 2 +-
.../beam/runners/core/SplittableParDo.java | 8 +-
.../apache/beam/runners/core/StateAccessor.java | 2 +-
.../beam/runners/core/StateInternals.java | 8 +-
.../runners/core/StateInternalsFactory.java | 2 +-
.../apache/beam/runners/core/StateMerging.java | 16 +-
.../apache/beam/runners/core/StateTable.java | 10 +-
.../org/apache/beam/runners/core/StateTag.java | 28 ++-
.../org/apache/beam/runners/core/StateTags.java | 70 ++++----
.../beam/runners/core/StatefulDoFnRunner.java | 6 +-
.../beam/runners/core/SystemReduceFn.java | 8 +-
.../core/TestInMemoryStateInternals.java | 6 +-
.../apache/beam/runners/core/WatermarkHold.java | 8 +-
.../beam/runners/core/WindowingInternals.java | 2 +-
.../AfterDelayFromFirstElementStateMachine.java | 2 +-
.../core/triggers/AfterPaneStateMachine.java | 2 +-
.../TriggerStateMachineContextFactory.java | 12 +-
.../triggers/TriggerStateMachineRunner.java | 2 +-
.../core/GroupAlsoByWindowsProperties.java | 10 +-
.../core/InMemoryStateInternalsTest.java | 16 +-
.../core/MergingActiveWindowSetTest.java | 2 +-
.../beam/runners/core/ReduceFnTester.java | 18 +-
.../beam/runners/core/SplittableParDoTest.java | 2 +-
.../apache/beam/runners/core/StateTagTest.java | 62 +++----
.../runners/core/StatefulDoFnRunnerTest.java | 4 +-
.../CopyOnAccessInMemoryStateInternals.java | 118 ++++++-------
.../runners/direct/DirectExecutionContext.java | 15 +-
.../beam/runners/direct/EvaluationContext.java | 8 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 6 +-
.../beam/runners/direct/ParDoEvaluator.java | 2 +-
...littableProcessElementsEvaluatorFactory.java | 2 +-
.../direct/StatefulParDoEvaluatorFactory.java | 5 +-
.../runners/direct/StepTransformResult.java | 4 +-
.../beam/runners/direct/TransformResult.java | 2 +-
.../CopyOnAccessInMemoryStateInternalsTest.java | 106 +++++------
.../beam/runners/direct/DirectRunnerTest.java | 1 +
.../runners/direct/EvaluationContextTest.java | 12 +-
.../StatefulParDoEvaluatorFactoryTest.java | 10 +-
.../functions/FlinkNoOpStepContext.java | 2 +-
.../functions/FlinkStatefulDoFnFunction.java | 2 +-
.../wrappers/streaming/DoFnOperator.java | 6 +-
.../streaming/SplittableDoFnOperator.java | 4 +-
.../wrappers/streaming/WindowDoFnOperator.java | 4 +-
.../state/FlinkBroadcastStateInternals.java | 45 +++--
.../state/FlinkKeyGroupStateInternals.java | 29 ++-
.../state/FlinkSplitStateInternals.java | 29 ++-
.../streaming/state/FlinkStateInternals.java | 48 ++---
.../flink/streaming/DoFnOperatorTest.java | 4 +-
.../FlinkBroadcastStateInternalsTest.java | 6 +-
.../FlinkKeyGroupStateInternalsTest.java | 2 +-
.../streaming/FlinkSplitStateInternalsTest.java | 2 +-
.../streaming/FlinkStateInternalsTest.java | 12 +-
.../BatchStatefulParDoOverridesTest.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 2 +-
.../spark/stateful/SparkStateInternals.java | 44 +++--
...SparkGroupAlsoByWindowViaOutputBufferFn.java | 2 +-
.../spark/translation/SparkProcessContext.java | 2 +-
.../spark/translation/TranslationUtils.java | 2 +-
.../org/apache/beam/sdk/transforms/DoFn.java | 8 +-
.../beam/sdk/transforms/GroupIntoBatches.java | 6 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 2 +-
.../apache/beam/sdk/util/state/StateBinder.java | 28 +--
.../apache/beam/sdk/util/state/StateSpec.java | 15 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 92 +++++-----
.../apache/beam/sdk/transforms/ParDoTest.java | 177 +++++++++----------
.../transforms/reflect/DoFnInvokersTest.java | 2 +-
.../transforms/reflect/DoFnSignaturesTest.java | 26 +--
.../beam/fn/harness/fake/FakeStepContext.java | 2 +-
88 files changed, 692 insertions(+), 701 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index b66d818..d5dd0dd 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -94,7 +94,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
private StateInternalsProxy<?> currentKeyStateInternals;
private final ApexTimerInternals<Object> currentKeyTimerInternals;
- private final StateInternals<Void> sideInputStateInternals;
+ private final StateInternals sideInputStateInternals;
private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
private LongMin pushedBackWatermark = new LongMin();
private long currentInputWatermark = Long.MIN_VALUE;
@@ -327,7 +327,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
NoOpStepContext stepContext = new NoOpStepContext() {
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
return currentKeyStateInternals;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index e682894..4300567 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -65,7 +65,7 @@ import org.joda.time.Instant;
* <p>For fields that need to be serialized, use {@link ApexStateInternalsFactory}
* or {@link StateInternalsProxy}
*/
-public class ApexStateInternals<K> implements StateInternals<K> {
+public class ApexStateInternals<K> implements StateInternals {
private final K key;
private final Table<String, String, byte[]> stateTable;
@@ -80,46 +80,44 @@ public class ApexStateInternals<K> implements StateInternals<K> {
}
@Override
- public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+ public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
return state(namespace, address, StateContexts.nullContext());
}
@Override
public <T extends State> T state(
- StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) {
- return address.bind(new ApexStateBinder(key, namespace, address, c));
+ StateNamespace namespace, StateTag<T> address, final StateContext<?> c) {
+ return address.bind(new ApexStateBinder(namespace, address, c));
}
/**
* A {@link StateBinder} that returns {@link State} wrappers for serialized state.
*/
- private class ApexStateBinder implements StateBinder<K> {
- private final K key;
+ private class ApexStateBinder implements StateBinder {
private final StateNamespace namespace;
private final StateContext<?> c;
- private ApexStateBinder(K key, StateNamespace namespace, StateTag<? super K, ?> address,
+ private ApexStateBinder(StateNamespace namespace, StateTag<?> address,
StateContext<?> c) {
- this.key = key;
this.namespace = namespace;
this.c = c;
}
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
return new ApexValueState<>(namespace, address, coder);
}
@Override
public <T> BagState<T> bindBag(
- final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ final StateTag<BagState<T>> address, Coder<T> elemCoder) {
return new ApexBagState<>(namespace, address, elemCoder);
}
@Override
public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address,
+ StateTag<SetState<T>> address,
Coder<T> elemCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", SetState.class.getSimpleName()));
@@ -127,7 +125,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", MapState.class.getSimpleName()));
@@ -136,7 +134,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
final CombineFn<InputT, AccumT, OutputT> combineFn) {
return new ApexCombiningState<>(
@@ -149,8 +147,8 @@ public class ApexStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
return new ApexWatermarkHoldState<>(namespace, address, timestampCombiner);
}
@@ -158,7 +156,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
@@ -167,12 +165,12 @@ public class ApexStateInternals<K> implements StateInternals<K> {
private class AbstractState<T> {
protected final StateNamespace namespace;
- protected final StateTag<?, ? extends State> address;
+ protected final StateTag<? extends State> address;
protected final Coder<T> coder;
private AbstractState(
StateNamespace namespace,
- StateTag<?, ? extends State> address,
+ StateTag<? extends State> address,
Coder<T> coder) {
this.namespace = namespace;
this.address = address;
@@ -233,7 +231,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
private ApexValueState(
StateNamespace namespace,
- StateTag<?, ValueState<T>> address,
+ StateTag<ValueState<T>> address,
Coder<T> coder) {
super(namespace, address, coder);
}
@@ -261,7 +259,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
public ApexWatermarkHoldState(
StateNamespace namespace,
- StateTag<?, WatermarkHoldState> address,
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
super(namespace, address, InstantCoder.of());
this.timestampCombiner = timestampCombiner;
@@ -312,7 +310,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
private final CombineFn<InputT, AccumT, OutputT> combineFn;
private ApexCombiningState(StateNamespace namespace,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> coder,
K key, CombineFn<InputT, AccumT, OutputT> combineFn) {
super(namespace, address, coder);
@@ -376,7 +374,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
private final class ApexBagState<T> extends AbstractState<List<T>> implements BagState<T> {
private ApexBagState(
StateNamespace namespace,
- StateTag<?, BagState<T>> address,
+ StateTag<BagState<T>> address,
Coder<T> coder) {
super(namespace, address, ListCoder.of(coder));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index cc64c7c..721eecd 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -60,7 +60,7 @@ public class NoOpStepContext implements ExecutionContext.StepContext, Serializab
}
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
return null;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
index 1f28364..746be2f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.util.state.StateContext;
* @param <K>
*/
@DefaultSerializer(JavaSerializer.class)
-public class StateInternalsProxy<K> implements StateInternals<K>, Serializable {
+public class StateInternalsProxy<K> implements StateInternals, Serializable {
private final StateInternalsFactory<K> factory;
private transient K currentKey;
@@ -55,12 +55,12 @@ public class StateInternalsProxy<K> implements StateInternals<K>, Serializable {
}
@Override
- public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+ public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
return factory.stateInternalsForKey(currentKey).state(namespace, address);
}
@Override
- public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address,
+ public <T extends State> T state(StateNamespace namespace, StateTag<T> address,
StateContext<?> c) {
return factory.stateInternalsForKey(currentKey).state(namespace, address, c);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 225b654..8b48a74 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -58,19 +58,19 @@ public class ApexStateInternalsTest {
private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
- private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+ private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+ private static final StateTag<CombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ private static final StateTag<BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, WatermarkHoldState>
+ private static final StateTag<WatermarkHoldState>
WATERMARK_EARLIEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
private ApexStateInternals<String> underTest;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 9754bb3..33ba80c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -169,7 +169,7 @@ public class PTransformMatchersTest implements Serializable {
private final String stateId = "mystate";
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> intState =
+ private final StateSpec<ValueState<Integer>> intState =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
index cc7b574..aa1474c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
@@ -165,7 +165,7 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex
}
@Override
- public abstract StateInternals<?> stateInternals();
+ public abstract StateInternals stateInternals();
@Override
public abstract TimerInternals timerInternals();
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
index ecd30c0..c0d01ae 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
@@ -92,7 +92,7 @@ public interface ExecutionContext {
Coder<W> windowCoder)
throws IOException;
- StateInternals<?> stateInternals();
+ StateInternals stateInternals();
TimerInternals timerInternals();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
index 9e66f07..311ed1c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
@@ -57,7 +57,7 @@ public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W extends
InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
timerInternals.advanceProcessingTime(Instant.now());
timerInternals.advanceSynchronizedProcessingTime(Instant.now());
- StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+ StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
new ReduceFnRunner<>(
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 651458f..34afa5d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -63,7 +63,7 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
K key = keyedWorkItem.key();
TimerInternals timerInternals = c.windowingInternals().timerInternals();
- StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+ StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
new ReduceFnRunner<>(
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index de4ac29..c553dbf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -114,7 +114,7 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
K key = keyedWorkItem.key();
- StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+ StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);
ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 2c02282..199ce41 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -53,7 +53,7 @@ import org.joda.time.Instant;
* and for running tests that need state.
*/
@Experimental(Kind.STATE)
-public class InMemoryStateInternals<K> implements StateInternals<K> {
+public class InMemoryStateInternals<K> implements StateInternals {
public static <K> InMemoryStateInternals<K> forKey(K key) {
return new InMemoryStateInternals<>(key);
@@ -79,10 +79,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
T copy();
}
- protected final StateTable<K> inMemoryState = new StateTable<K>() {
+ protected final StateTable inMemoryState = new StateTable() {
@Override
- protected StateBinder<K> binderForNamespace(StateNamespace namespace, StateContext<?> c) {
- return new InMemoryStateBinder<K>(key, c);
+ protected StateBinder binderForNamespace(StateNamespace namespace, StateContext<?> c) {
+ return new InMemoryStateBinder(c);
}
};
@@ -99,48 +99,46 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
}
@Override
- public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+ public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
return inMemoryState.get(namespace, address, StateContexts.nullContext());
}
@Override
public <T extends State> T state(
- StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) {
+ StateNamespace namespace, StateTag<T> address, final StateContext<?> c) {
return inMemoryState.get(namespace, address, c);
}
/**
* A {@link StateBinder} that returns In Memory {@link State} objects.
*/
- public static class InMemoryStateBinder<K> implements StateBinder<K> {
- private final K key;
+ public static class InMemoryStateBinder implements StateBinder {
private final StateContext<?> c;
- public InMemoryStateBinder(K key, StateContext<?> c) {
- this.key = key;
+ public InMemoryStateBinder(StateContext<?> c) {
this.c = c;
}
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
return new InMemoryValue<T>();
}
@Override
public <T> BagState<T> bindBag(
- final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ final StateTag<BagState<T>> address, Coder<T> elemCoder) {
return new InMemoryBag<T>();
}
@Override
- public <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> elemCoder) {
+ public <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder) {
return new InMemorySet<>();
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
return new InMemoryMap<>();
}
@@ -148,23 +146,23 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
final CombineFn<InputT, AccumT, OutputT> combineFn) {
return new InMemoryCombiningState<>(combineFn);
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
- return new InMemoryWatermarkHold<W>(timestampCombiner);
+ return new InMemoryWatermarkHold(timestampCombiner);
}
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
index b4e864c..2faedbb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
@@ -67,10 +67,10 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
*/
private final ValueState<Map<W, Set<W>>> valueState;
- public MergingActiveWindowSet(WindowFn<Object, W> windowFn, StateInternals<?> state) {
+ public MergingActiveWindowSet(WindowFn<Object, W> windowFn, StateInternals state) {
this.windowFn = windowFn;
- StateTag<Object, ValueState<Map<W, Set<W>>>> tag =
+ StateTag<ValueState<Map<W, Set<W>>>> tag =
StateTags.makeSystemTagInternal(StateTags.value(
"tree", MapCoder.of(windowFn.windowCoder(), SetCoder.of(windowFn.windowCoder()))));
valueState = state.state(StateNamespaces.global(), tag);
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
index e948650..5ffb9a2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
@@ -37,5 +37,5 @@ public interface MergingStateAccessor<K, W extends BoundedWindow>
* are known to have state.
*/
<StateT extends State> Map<W, StateT> accessInEachMergingWindow(
- StateTag<? super K, StateT> address);
+ StateTag<StateT> address);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
index 3e875c2..06dcc9c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -113,7 +113,7 @@ public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
extends NonEmptyPanes<K, W> {
- private static final StateTag<Object, CombiningState<Long, long[], Long>>
+ private static final StateTag<CombiningState<Long, long[], Long>>
PANE_ADDITIONS_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
"count", VarLongCoder.of(), Sum.ofLongs()));
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
index 4cf4d67..66b3960 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -43,7 +43,7 @@ public class PaneInfoTracker {
}
@VisibleForTesting
- static final StateTag<Object, ValueState<PaneInfo>> PANE_INFO_TAG =
+ static final StateTag<ValueState<PaneInfo>> PANE_INFO_TAG =
StateTags.makeSystemTagInternal(StateTags.value("pane", PaneInfoCoder.INSTANCE));
public void clear(StateAccessor<?> state) {
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 8493474..3031ebf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -51,7 +51,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
private final K key;
private final ReduceFn<K, InputT, OutputT, W> reduceFn;
private final WindowingStrategy<?, W> windowingStrategy;
- private final StateInternals<K> stateInternals;
+ private final StateInternals stateInternals;
private final ActiveWindowSet<W> activeWindows;
private final TimerInternals timerInternals;
private final SideInputReader sideInputReader;
@@ -61,7 +61,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
K key,
ReduceFn<K, InputT, OutputT, W> reduceFn,
WindowingStrategy<?, W> windowingStrategy,
- StateInternals<K> stateInternals,
+ StateInternals stateInternals,
ActiveWindowSet<W> activeWindows,
TimerInternals timerInternals,
SideInputReader sideInputReader,
@@ -165,11 +165,15 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
protected final StateContext<W> context;
protected final StateNamespace windowNamespace;
protected final Coder<W> windowCoder;
- protected final StateInternals<K> stateInternals;
+ protected final StateInternals stateInternals;
protected final StateStyle style;
- public StateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
- StateInternals<K> stateInternals, StateContext<W> context, StateStyle style) {
+ public StateAccessorImpl(
+ ActiveWindowSet<W> activeWindows,
+ Coder<W> windowCoder,
+ StateInternals stateInternals,
+ StateContext<W> context,
+ StateStyle style) {
this.activeWindows = activeWindows;
this.windowCoder = windowCoder;
@@ -196,7 +200,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
}
@Override
- public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
+ public <StateT extends State> StateT access(StateTag<StateT> address) {
switch (style) {
case DIRECT:
return stateInternals.state(windowNamespace(), address, context);
@@ -212,8 +216,12 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
private final Collection<W> activeToBeMerged;
- public MergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
- StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged,
+ public MergingStateAccessorImpl(
+ ActiveWindowSet<W> activeWindows,
+ Coder<W> windowCoder,
+ StateInternals stateInternals,
+ StateStyle style,
+ Collection<W> activeToBeMerged,
W mergeResult) {
super(activeWindows, windowCoder, stateInternals,
stateContextForWindowOnly(mergeResult), style);
@@ -221,7 +229,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
}
@Override
- public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
+ public <StateT extends State> StateT access(StateTag<StateT> address) {
switch (style) {
case DIRECT:
return stateInternals.state(windowNamespace(), address, context);
@@ -237,7 +245,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
@Override
public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
- StateTag<? super K, StateT> address) {
+ StateTag<StateT> address) {
ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
for (W mergingWindow : activeToBeMerged) {
StateNamespace namespace = null;
@@ -258,8 +266,11 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
static class PremergingStateAccessorImpl<K, W extends BoundedWindow>
extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
- public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
- StateInternals<K> stateInternals, W window) {
+ public PremergingStateAccessorImpl(
+ ActiveWindowSet<W> activeWindows,
+ Coder<W> windowCoder,
+ StateInternals stateInternals,
+ W window) {
super(activeWindows, windowCoder, stateInternals,
stateContextForWindowOnly(window), StateStyle.RENAMED);
}
@@ -270,7 +281,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
@Override
public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
- StateTag<? super K, StateT> address) {
+ StateTag<StateT> address) {
ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
for (W stateAddressWindow : activeWindows.readStateAddresses(context.window())) {
StateT stateForWindow =
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 08014ec..d3dc067 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -108,7 +108,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
private final OutputWindowedValue<KV<K, OutputT>> outputter;
- private final StateInternals<K> stateInternals;
+ private final StateInternals stateInternals;
private final Counter droppedDueToClosedWindow;
@@ -214,7 +214,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
K key,
WindowingStrategy<?, W> windowingStrategy,
ExecutableTriggerStateMachine triggerStateMachine,
- StateInternals<K> stateInternals,
+ StateInternals stateInternals,
TimerInternals timerInternals,
OutputWindowedValue<KV<K, OutputT>> outputter,
SideInputReader sideInputReader,
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 26e920a..5c67148 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -61,7 +61,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
* be keep locally but if side inputs are broadcast to all parallel operators then all will
* have the same view of the state.
*/
- private final StateInternals<Void> stateInternals;
+ private final StateInternals stateInternals;
/**
* A state tag for each side input that we handle. The state is used to track
@@ -70,7 +70,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
private final Map<
PCollectionView<?>,
StateTag<
- Object,
CombiningState<
BoundedWindow,
Set<BoundedWindow>,
@@ -81,7 +80,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
*/
private final Map<
PCollectionView<?>,
- StateTag<Object, ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags;
+ StateTag<ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags;
/**
* Creates a new {@code SideInputHandler} for the given side inputs that uses
@@ -89,7 +88,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
*/
public SideInputHandler(
Collection<PCollectionView<?>> sideInputs,
- StateInternals<Void> stateInternals) {
+ StateInternals stateInternals) {
this.sideInputs = sideInputs;
this.stateInternals = stateInternals;
this.availableWindowsTags = new HashMap<>();
@@ -105,7 +104,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
.windowCoder();
StateTag<
- Object,
CombiningState<
BoundedWindow,
Set<BoundedWindow>,
@@ -117,7 +115,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
availableWindowsTags.put(sideInput, availableTag);
Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
- StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+ StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(), coder);
sideInputContentsTags.put(sideInput, stateTag);
}
@@ -145,7 +143,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
inputWithReifiedWindows.add(value.withValue(e));
}
- StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+ StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
sideInputContentsTags.get(sideInput);
for (BoundedWindow window: value.getWindows()) {
@@ -175,7 +173,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
.getWindowFn()
.windowCoder();
- StateTag<Object, ValueState<Iterable<WindowedValue<?>>>> stateTag =
+ StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
sideInputContentsTags.get(sideInput);
ValueState<Iterable<WindowedValue<?>>> state =
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 1865d54..edce9a2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -614,8 +614,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
@Override
public State state(String stateId) {
try {
- StateSpec<?, ?> spec =
- (StateSpec<?, ?>) signature.stateDeclarations().get(stateId).field().get(fn);
+ StateSpec<?> spec =
+ (StateSpec<?>) signature.stateDeclarations().get(stateId).field().get(fn);
return stepContext
.stateInternals()
.state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec));
@@ -726,8 +726,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
@Override
public State state(String stateId) {
try {
- StateSpec<?, ?> spec =
- (StateSpec<?, ?>) signature.stateDeclarations().get(stateId).field().get(fn);
+ StateSpec<?> spec =
+ (StateSpec<?>) signature.stateDeclarations().get(stateId).field().get(fn);
return stepContext
.stateInternals()
.state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec));
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index b8db491..b5f8f45 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -489,7 +489,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
}
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
return context.stepContext.stateInternals();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 0fa6f76..94f5f35 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -353,7 +353,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
* the input watermark when the first {@link DoFn.ProcessElement} call for this element
* completes.
*/
- private static final StateTag<Object, WatermarkHoldState> watermarkHoldTag =
+ private static final StateTag<WatermarkHoldState> watermarkHoldTag =
StateTags.makeSystemTagInternal(
StateTags.<GlobalWindow>watermarkStateInternal(
"hold", TimestampCombiner.LATEST));
@@ -363,13 +363,13 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
* DoFn.ProcessElement} call and read during subsequent calls in response to timer firings, when
* the original element is no longer available.
*/
- private final StateTag<Object, ValueState<WindowedValue<InputT>>> elementTag;
+ private final StateTag<ValueState<WindowedValue<InputT>>> elementTag;
/**
* The state cell containing a restriction representing the unprocessed part of work for this
* element.
*/
- private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
+ private StateTag<ValueState<RestrictionT>> restrictionTag;
private final DoFn<InputT, OutputT> fn;
private final Coder<InputT> elementCoder;
@@ -453,7 +453,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
@ProcessElement
public void processElement(final ProcessContext c) {
String key = c.element().key();
- StateInternals<String> stateInternals =
+ StateInternals stateInternals =
stateInternalsFactory.stateInternalsForKey(key);
TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
index 87353f2..eda896b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
@@ -34,5 +34,5 @@ public interface StateAccessor<K> {
* <p>Never accounts for merged windows. When windows are merged, any state accessed via
* this method must be eagerly combined and written into the result window.
*/
- <StateT extends State> StateT access(StateTag<? super K, StateT> address);
+ <StateT extends State> StateT access(StateTag<StateT> address);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
index e6440bf..c2e9412 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
@@ -40,20 +40,20 @@ import org.apache.beam.sdk.util.state.StateContext;
* used directly, and is highly likely to change.
*/
@Experimental(Kind.STATE)
-public interface StateInternals<K> {
+public interface StateInternals {
/** The key for this {@link StateInternals}. */
- K getKey();
+ Object getKey();
/**
* Return the state associated with {@code address} in the specified {@code namespace}.
*/
- <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address);
+ <T extends State> T state(StateNamespace namespace, StateTag<T> address);
/**
* Return the state associated with {@code address} in the specified {@code namespace}
* with the {@link StateContext}.
*/
<T extends State> T state(
- StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c);
+ StateNamespace namespace, StateTag<T> address, StateContext<?> c);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
index ea7d742..c756364 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
@@ -31,5 +31,5 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
public interface StateInternalsFactory<K> {
/** Returns {@link StateInternals} for the provided key. */
- StateInternals<K> stateInternalsForKey(K key);
+ StateInternals stateInternalsForKey(K key);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index ce37fd3..f6b9103 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -42,7 +42,7 @@ public class StateMerging {
* in {@code context}.
*/
public static <K, StateT extends State, W extends BoundedWindow> void clear(
- MergingStateAccessor<K, W> context, StateTag<? super K, StateT> address) {
+ MergingStateAccessor<K, W> context, StateTag<StateT> address) {
for (StateT state : context.accessInEachMergingWindow(address).values()) {
state.clear();
}
@@ -54,7 +54,7 @@ public class StateMerging {
* blindly append to.
*/
public static <K, T, W extends BoundedWindow> void prefetchBags(
- MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) {
+ MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) {
Map<W, BagState<T>> map = context.accessInEachMergingWindow(address);
if (map.isEmpty()) {
// Nothing to prefetch.
@@ -73,7 +73,7 @@ public class StateMerging {
* Merge all bag state in {@code address} across all windows under merge.
*/
public static <K, T, W extends BoundedWindow> void mergeBags(
- MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) {
+ MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) {
mergeBags(context.accessInEachMergingWindow(address).values(), context.access(address));
}
@@ -116,7 +116,7 @@ public class StateMerging {
* Merge all set state in {@code address} across all windows under merge.
*/
public static <K, T, W extends BoundedWindow> void mergeSets(
- MergingStateAccessor<K, W> context, StateTag<? super K, SetState<T>> address) {
+ MergingStateAccessor<K, W> context, StateTag<SetState<T>> address) {
mergeSets(context.accessInEachMergingWindow(address).values(), context.access(address));
}
@@ -161,7 +161,7 @@ public class StateMerging {
*/
public static <K, StateT extends GroupingState<?, ?>, W extends BoundedWindow> void
prefetchCombiningValues(MergingStateAccessor<K, W> context,
- StateTag<? super K, StateT> address) {
+ StateTag<StateT> address) {
for (StateT state : context.accessInEachMergingWindow(address).values()) {
prefetchRead(state);
}
@@ -172,7 +172,7 @@ public class StateMerging {
*/
public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(
MergingStateAccessor<K, W> context,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address) {
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address) {
mergeCombiningValues(
context.accessInEachMergingWindow(address).values(), context.access(address));
}
@@ -218,7 +218,7 @@ public class StateMerging {
*/
public static <K, W extends BoundedWindow> void prefetchWatermarks(
MergingStateAccessor<K, W> context,
- StateTag<? super K, WatermarkHoldState> address) {
+ StateTag<WatermarkHoldState> address) {
Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(address);
WatermarkHoldState result = context.access(address);
if (map.isEmpty()) {
@@ -250,7 +250,7 @@ public class StateMerging {
*/
public static <K, W extends BoundedWindow> void mergeWatermarks(
MergingStateAccessor<K, W> context,
- StateTag<? super K, WatermarkHoldState> address,
+ StateTag<WatermarkHoldState> address,
W mergeResult) {
mergeWatermarks(
context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult);
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
index d2511c9..1bf4ff5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
@@ -28,9 +28,9 @@ import org.apache.beam.sdk.util.state.StateContext;
/**
* Table mapping {@code StateNamespace} and {@code StateTag<?>} to a {@code State} instance.
*/
-public abstract class StateTable<K> {
+public abstract class StateTable {
- private final Table<StateNamespace, StateTag<? super K, ?>, State> stateTable =
+ private final Table<StateNamespace, StateTag<?>, State> stateTable =
HashBasedTable.create();
/**
@@ -39,7 +39,7 @@ public abstract class StateTable<K> {
* already present in this {@link StateTable}.
*/
public <StateT extends State> StateT get(
- StateNamespace namespace, StateTag<? super K, StateT> tag, StateContext<?> c) {
+ StateNamespace namespace, StateTag<StateT> tag, StateContext<?> c) {
State storage = stateTable.get(namespace, tag);
if (storage != null) {
@SuppressWarnings("unchecked")
@@ -68,7 +68,7 @@ public abstract class StateTable<K> {
return stateTable.containsRow(namespace);
}
- public Map<StateTag<? super K, ?>, State> getTagsInUse(StateNamespace namespace) {
+ public Map<StateTag<?>, State> getTagsInUse(StateNamespace namespace) {
return stateTable.row(namespace);
}
@@ -80,5 +80,5 @@ public abstract class StateTable<K> {
* Provide the {@code StateBinder} to use for creating {@code Storage} instances
* in the specified {@code namespace}.
*/
- protected abstract StateBinder<K> binderForNamespace(StateNamespace namespace, StateContext<?> c);
+ protected abstract StateBinder binderForNamespace(StateNamespace namespace, StateContext<?> c);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index aaeecf0..38e9dea 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -46,12 +45,10 @@ import org.apache.beam.sdk.util.state.WatermarkHoldState;
*
* <p>Currently, this can only be used in a step immediately following a {@link GroupByKey}.
*
- * @param <K> The type of key that must be used with the state tag. Contravariant: methods should
- * accept values of type {@code KeyedStateTag<? super K, StateT>}.
* @param <StateT> The type of state being tagged.
*/
@Experimental(Kind.STATE)
-public interface StateTag<K, StateT extends State> extends Serializable {
+public interface StateTag<StateT extends State> extends Serializable {
/** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */
void appendTo(Appendable sb) throws IOException;
@@ -64,7 +61,7 @@ public interface StateTag<K, StateT extends State> extends Serializable {
/**
* The specification for the state stored in the referenced cell.
*/
- StateSpec<K, StateT> getSpec();
+ StateSpec<StateT> getSpec();
/**
* Bind this state tag. See {@link StateSpec#bind}.
@@ -72,35 +69,34 @@ public interface StateTag<K, StateT extends State> extends Serializable {
* @deprecated Use the {@link StateSpec#bind} method via {@link #getSpec} for now.
*/
@Deprecated
- StateT bind(StateBinder<? extends K> binder);
+ StateT bind(StateBinder binder);
/**
* Visitor for binding a {@link StateSpec} and to the associated {@link State}.
*
- * @param <K> the type of key this binder embodies.
* @deprecated for migration only; runners should reference the top level {@link StateBinder}
* and move towards {@link StateSpec} rather than {@link StateTag}.
*/
@Deprecated
- public interface StateBinder<K> {
- <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> spec, Coder<T> coder);
+ public interface StateBinder {
+ <T> ValueState<T> bindValue(StateTag<ValueState<T>> spec, Coder<T> coder);
- <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> spec, Coder<T> elemCoder);
+ <T> BagState<T> bindBag(StateTag<BagState<T>> spec, Coder<T> elemCoder);
- <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> elemCoder);
+ <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder);
<KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
<InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
@@ -110,7 +106,7 @@ public interface StateTag<K, StateT extends State> extends Serializable {
* <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps
* added to the returned {@link WatermarkHoldState} are to be combined.
*/
- <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> spec, TimestampCombiner timestampCombiner);
+ WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> spec, TimestampCombiner timestampCombiner);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index fe99f27..ca8b238 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -50,29 +50,29 @@ public class StateTags {
/** @deprecated for migration purposes only */
@Deprecated
- private static <K> StateBinder<K> adaptTagBinder(final StateTag.StateBinder<K> binder) {
- return new StateBinder<K>() {
+ private static StateBinder adaptTagBinder(final StateTag.StateBinder binder) {
+ return new StateBinder() {
@Override
public <T> ValueState<T> bindValue(
- String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) {
+ String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
return binder.bindValue(tagForSpec(id, spec), coder);
}
@Override
public <T> BagState<T> bindBag(
- String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder) {
+ String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
return binder.bindBag(tagForSpec(id, spec), elemCoder);
}
@Override
public <T> SetState<T> bindSet(
- String id, StateSpec<? super K, SetState<T>> spec, Coder<T> elemCoder) {
+ String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
return binder.bindSet(tagForSpec(id, spec), elemCoder);
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
+ String id, StateSpec<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
return binder.bindMap(tagForSpec(id, spec), mapKeyCoder, mapValueCoder);
}
@@ -81,7 +81,7 @@ public class StateTags {
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombining(
String id,
- StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, combineFn);
@@ -91,7 +91,7 @@ public class StateTags {
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
String id,
- StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return binder.bindCombiningValueWithContext(
@@ -99,9 +99,9 @@ public class StateTags {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ public WatermarkHoldState bindWatermark(
String id,
- StateSpec<? super K, WatermarkHoldState> spec,
+ StateSpec<WatermarkHoldState> spec,
TimestampCombiner timestampCombiner) {
return binder.bindWatermark(tagForSpec(id, spec), timestampCombiner);
}
@@ -121,20 +121,20 @@ public class StateTags {
private StateTags() { }
- private interface SystemStateTag<K, StateT extends State> {
- StateTag<K, StateT> asKind(StateKind kind);
+ private interface SystemStateTag<StateT extends State> {
+ StateTag<StateT> asKind(StateKind kind);
}
/** Create a state tag for the given id and spec. */
- public static <K, StateT extends State> StateTag<K, StateT> tagForSpec(
- String id, StateSpec<K, StateT> spec) {
+ public static <StateT extends State> StateTag<StateT> tagForSpec(
+ String id, StateSpec<StateT> spec) {
return new SimpleStateTag<>(new StructuredId(id), spec);
}
/**
* Create a simple state tag for values of type {@code T}.
*/
- public static <T> StateTag<Object, ValueState<T>> value(String id, Coder<T> valueCoder) {
+ public static <T> StateTag<ValueState<T>> value(String id, Coder<T> valueCoder) {
return new SimpleStateTag<>(new StructuredId(id), StateSpecs.value(valueCoder));
}
@@ -143,7 +143,7 @@ public class StateTags {
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
+ StateTag<CombiningState<InputT, AccumT, OutputT>>
combiningValue(
String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
@@ -155,7 +155,7 @@ public class StateTags {
* merge multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
+ StateTag<CombiningState<InputT, AccumT, OutputT>>
combiningValueWithContext(
String id,
Coder<AccumT> accumCoder,
@@ -172,7 +172,7 @@ public class StateTags {
* should only be used to initialize static values.
*/
public static <InputT, AccumT, OutputT>
- StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
+ StateTag<CombiningState<InputT, AccumT, OutputT>>
combiningValueFromInputInternal(
String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
@@ -183,21 +183,21 @@ public class StateTags {
* Create a state tag that is optimized for adding values frequently, and
* occasionally retrieving all the values that have been added.
*/
- public static <T> StateTag<Object, BagState<T>> bag(String id, Coder<T> elemCoder) {
+ public static <T> StateTag<BagState<T>> bag(String id, Coder<T> elemCoder) {
return new SimpleStateTag<>(new StructuredId(id), StateSpecs.bag(elemCoder));
}
/**
* Create a state spec that supporting for {@link java.util.Set} like access patterns.
*/
- public static <T> StateTag<Object, SetState<T>> set(String id, Coder<T> elemCoder) {
+ public static <T> StateTag<SetState<T>> set(String id, Coder<T> elemCoder) {
return new SimpleStateTag<>(new StructuredId(id), StateSpecs.set(elemCoder));
}
/**
* Create a state spec that supporting for {@link java.util.Map} like access patterns.
*/
- public static <K, V> StateTag<Object, MapState<K, V>> map(
+ public static <K, V> StateTag<MapState<K, V>> map(
String id, Coder<K> keyCoder, Coder<V> valueCoder) {
return new SimpleStateTag<>(new StructuredId(id), StateSpecs.map(keyCoder, valueCoder));
}
@@ -205,7 +205,7 @@ public class StateTags {
/**
* Create a state tag for holding the watermark.
*/
- public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState>
+ public static <W extends BoundedWindow> StateTag<WatermarkHoldState>
watermarkStateInternal(String id, TimestampCombiner timestampCombiner) {
return new SimpleStateTag<>(
new StructuredId(id), StateSpecs.watermarkStateInternal(timestampCombiner));
@@ -215,20 +215,20 @@ public class StateTags {
* Convert an arbitrary {@link StateTag} to a system-internal tag that is guaranteed not to
* collide with any user tags.
*/
- public static <K, StateT extends State> StateTag<K, StateT> makeSystemTagInternal(
- StateTag<K, StateT> tag) {
+ public static <StateT extends State> StateTag<StateT> makeSystemTagInternal(
+ StateTag<StateT> tag) {
if (!(tag instanceof SystemStateTag)) {
throw new IllegalArgumentException("Expected subclass of SimpleStateTag, got " + tag);
}
// Checked above
@SuppressWarnings("unchecked")
- SystemStateTag<K, StateT> typedTag = (SystemStateTag<K, StateT>) tag;
+ SystemStateTag<StateT> typedTag = (SystemStateTag<StateT>) tag;
return typedTag.asKind(StateKind.SYSTEM);
}
- public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>>
+ public static <InputT, AccumT, OutputT> StateTag<BagState<AccumT>>
convertToBagTagInternal(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> combiningTag) {
+ StateTag<CombiningState<InputT, AccumT, OutputT>> combiningTag) {
return new SimpleStateTag<>(
new StructuredId(combiningTag.getId()),
StateSpecs.convertToBagSpecInternal(combiningTag.getSpec()));
@@ -291,20 +291,20 @@ public class StateTags {
/**
* A basic {@link StateTag} implementation that manages the structured ids.
*/
- private static class SimpleStateTag<K, StateT extends State>
- implements StateTag<K, StateT>, SystemStateTag<K, StateT> {
+ private static class SimpleStateTag<StateT extends State>
+ implements StateTag<StateT>, SystemStateTag<StateT> {
- private final StateSpec<K, StateT> spec;
+ private final StateSpec<StateT> spec;
private final StructuredId id;
- public SimpleStateTag(StructuredId id, StateSpec<K, StateT> spec) {
+ public SimpleStateTag(StructuredId id, StateSpec<StateT> spec) {
this.id = id;
this.spec = spec;
}
@Override
@Deprecated
- public StateT bind(StateTag.StateBinder<? extends K> binder) {
+ public StateT bind(StateTag.StateBinder binder) {
return spec.bind(
this.id.getRawId(), adaptTagBinder(binder));
}
@@ -315,7 +315,7 @@ public class StateTags {
}
@Override
- public StateSpec<K, StateT> getSpec() {
+ public StateSpec<StateT> getSpec() {
return spec;
}
@@ -332,7 +332,7 @@ public class StateTags {
}
@Override
- public StateTag<K, StateT> asKind(StateKind kind) {
+ public StateTag<StateT> asKind(StateKind kind) {
return new SimpleStateTag<>(id.asKind(kind), spec);
}
@@ -342,7 +342,7 @@ public class StateTags {
return false;
}
- SimpleStateTag<?, ?> otherTag = (SimpleStateTag<?, ?>) other;
+ SimpleStateTag<?> otherTag = (SimpleStateTag<?>) other;
return Objects.equals(this.getId(), otherTag.getId())
&& Objects.equals(this.getSpec(), otherTag.getSpec());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 7a20590..e3717a8 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -236,12 +236,12 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
private final DoFn<?, ?> fn;
private final DoFnSignature signature;
- private final StateInternals<?> stateInternals;
+ private final StateInternals stateInternals;
private final Coder<W> windowCoder;
public StateInternalsStateCleaner(
DoFn<?, ?> fn,
- StateInternals<?> stateInternals,
+ StateInternals stateInternals,
Coder<W> windowCoder) {
this.fn = fn;
this.signature = DoFnSignatures.getSignature(fn.getClass());
@@ -254,7 +254,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
signature.stateDeclarations().entrySet()) {
try {
- StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
+ StateSpec<?> spec = (StateSpec<?>) entry.getValue().field().get(fn);
State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
state.clear();
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index 86a7fd7..f18460a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -47,7 +47,7 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
*/
public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>
buffering(final Coder<T> inputCoder) {
- final StateTag<Object, BagState<T>> bufferTag =
+ final StateTag<BagState<T>> bufferTag =
StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, inputCoder));
return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag) {
@Override
@@ -70,7 +70,7 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
AccumT, OutputT, W>
combining(
final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- final StateTag<Object, CombiningState<InputT, AccumT, OutputT>> bufferTag;
+ final StateTag<CombiningState<InputT, AccumT, OutputT>> bufferTag;
if (combineFn.getFn() instanceof CombineFnWithContext) {
bufferTag = StateTags.makeSystemTagInternal(
StateTags.<InputT, AccumT, OutputT>combiningValueWithContext(
@@ -96,10 +96,10 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
};
}
- private StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag;
+ private StateTag<? extends GroupingState<InputT, OutputT>> bufferTag;
public SystemReduceFn(
- StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag) {
+ StateTag<? extends GroupingState<InputT, OutputT>> bufferTag) {
this.bufferTag = bufferTag;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
index 1dfb85f..18b50db 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
@@ -32,9 +32,9 @@ public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> {
super(key);
}
- public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) {
- Set<StateTag<? super K, ?>> inUse = new HashSet<>();
- for (Map.Entry<StateTag<? super K, ?>, State> entry :
+ public Set<StateTag<?>> getTagsInUse(StateNamespace namespace) {
+ Set<StateTag<?>> inUse = new HashSet<>();
+ for (Map.Entry<StateTag<?>, State> entry :
inMemoryState.getTagsInUse(namespace).entrySet()) {
if (!isEmptyForTesting(entry.getValue())) {
inUse.add(entry.getKey());
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 9bb9c62..e6e4ffb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -54,9 +54,9 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
* used for elements.
*/
public static <W extends BoundedWindow>
- StateTag<Object, WatermarkHoldState> watermarkHoldTagForTimestampCombiner(
+ StateTag<WatermarkHoldState> watermarkHoldTagForTimestampCombiner(
TimestampCombiner timestampCombiner) {
- return StateTags.<Object, WatermarkHoldState>makeSystemTagInternal(
+ return StateTags.<WatermarkHoldState>makeSystemTagInternal(
StateTags.<W>watermarkStateInternal("hold", timestampCombiner));
}
@@ -67,13 +67,13 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
* would take the end-of-window time as its element time.)
*/
@VisibleForTesting
- public static final StateTag<Object, WatermarkHoldState> EXTRA_HOLD_TAG =
+ public static final StateTag<WatermarkHoldState> EXTRA_HOLD_TAG =
StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal(
"extra", TimestampCombiner.EARLIEST));
private final TimerInternals timerInternals;
private final WindowingStrategy<?, W> windowingStrategy;
- private final StateTag<Object, WatermarkHoldState> elementHoldTag;
+ private final StateTag<WatermarkHoldState> elementHoldTag;
public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
this.timerInternals = timerInternals;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
index 5005065..a824a7b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
@@ -40,7 +40,7 @@ public interface WindowingInternals<InputT, OutputT> {
* Unsupported state internals. The key type is unknown. It is up to the user to use the
* correct type of key.
*/
- StateInternals<?> stateInternals();
+ StateInternals stateInternals();
/**
* Output the value at the specified timestamp in the listed windows.
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index b416788..ed2c26f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -55,7 +55,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
protected static final List<SerializableFunction<Instant, Instant>> IDENTITY =
ImmutableList.<SerializableFunction<Instant, Instant>>of();
- protected static final StateTag<Object, CombiningState<Instant,
+ protected static final StateTag<CombiningState<Instant,
Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
"delayed", InstantCoder.of(), Min.<Instant>naturalOrder()));
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index 11323cc..52fb5ff 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.state.CombiningState;
@Experimental(Experimental.Kind.TRIGGER)
public class AfterPaneStateMachine extends OnceTriggerStateMachine {
-private static final StateTag<Object, CombiningState<Long, long[], Long>>
+private static final StateTag<CombiningState<Long, long[], Long>>
ELEMENTS_IN_PANE_TAG =
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
"count", VarLongCoder.of(), Sum.ofLongs()));
[3/6] beam git commit: Simplify type parameters of StateSpec and
related
Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index 31e931c..cfe3f9b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineContextFactory;
import org.apache.beam.sdk.util.state.BagState;
@@ -62,7 +61,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
* <p>Note: Ignore index of key.
* Mainly for SideInputs.
*/
-public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
+public class FlinkBroadcastStateInternals<K> implements StateInternals {
private int indexInSubtaskGroup;
private final DefaultOperatorStateBackend stateBackend;
@@ -86,7 +85,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address) {
+ StateTag<T> address) {
return state(namespace, address, StateContexts.nullContext());
}
@@ -94,36 +93,36 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address,
+ StateTag<T> address,
final StateContext<?> context) {
return address.bind(
- new StateTag.StateBinder<K>() {
+ new StateTag.StateBinder() {
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
return new FlinkBroadcastValueState<>(stateBackend, address, namespace, coder);
}
@Override
public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ StateTag<BagState<T>> address, Coder<T> elemCoder) {
return new FlinkBroadcastBagState<>(stateBackend, address, namespace, elemCoder);
}
@Override
public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ StateTag<SetState<T>> address, Coder<T> elemCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", SetState.class.getSimpleName()));
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
throw new UnsupportedOperationException(
@@ -133,7 +132,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
@@ -144,7 +143,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return new FlinkCombiningStateWithContext<>(
@@ -158,8 +157,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
throw new UnsupportedOperationException(
String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
@@ -302,11 +301,11 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
extends AbstractBroadcastState<T> implements ValueState<T> {
private final StateNamespace namespace;
- private final StateTag<? super K, ValueState<T>> address;
+ private final StateTag<ValueState<T>> address;
FlinkBroadcastValueState(
DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, ValueState<T>> address,
+ StateTag<ValueState<T>> address,
StateNamespace namespace,
Coder<T> coder) {
super(flinkStateBackend, address.getId(), namespace, coder);
@@ -363,11 +362,11 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
implements BagState<T> {
private final StateNamespace namespace;
- private final StateTag<? super K, BagState<T>> address;
+ private final StateTag<BagState<T>> address;
FlinkBroadcastBagState(
DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, BagState<T>> address,
+ StateTag<BagState<T>> address,
StateNamespace namespace,
Coder<T> coder) {
super(flinkStateBackend, address.getId(), namespace, ListCoder.of(coder));
@@ -451,12 +450,12 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
FlinkCombiningState(
DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder) {
@@ -568,13 +567,13 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
private final FlinkBroadcastStateInternals<K> flinkStateInternals;
FlinkKeyedCombiningState(
DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
@@ -704,14 +703,14 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
private final FlinkBroadcastStateInternals<K> flinkStateInternals;
private final CombineWithContext.Context context;
FlinkCombiningStateWithContext(
DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index 67d7966..c9b7797 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.state.BagState;
@@ -67,7 +66,7 @@ import org.apache.flink.util.Preconditions;
*
* <p>Reference from {@link HeapInternalTimerService} to the local key-group range.
*/
-public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
+public class FlinkKeyGroupStateInternals<K> implements StateInternals {
private final Coder<K> keyCoder;
private final KeyGroupsList localKeyGroupRange;
@@ -109,7 +108,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address) {
+ StateTag<T> address) {
return state(namespace, address, StateContexts.nullContext());
}
@@ -117,36 +116,36 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address,
+ StateTag<T> address,
final StateContext<?> context) {
return address.bind(
- new StateTag.StateBinder<K>() {
+ new StateTag.StateBinder() {
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", ValueState.class.getSimpleName()));
}
@Override
public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ StateTag<BagState<T>> address, Coder<T> elemCoder) {
return new FlinkKeyGroupBagState<>(address, namespace, elemCoder);
}
@Override
public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ StateTag<SetState<T>> address, Coder<T> elemCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", SetState.class.getSimpleName()));
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
throw new UnsupportedOperationException(
@@ -156,7 +155,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
throw new UnsupportedOperationException("bindCombiningValue is not supported.");
@@ -165,7 +164,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
throw new UnsupportedOperationException(
@@ -173,8 +172,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
throw new UnsupportedOperationException(
String.format("%s is not supported", CombiningState.class.getSimpleName()));
@@ -334,10 +333,10 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
implements BagState<T> {
private final StateNamespace namespace;
- private final StateTag<? super K, BagState<T>> address;
+ private final StateTag<BagState<T>> address;
FlinkKeyGroupBagState(
- StateTag<? super K, BagState<T>> address,
+ StateTag<BagState<T>> address,
StateNamespace namespace,
Coder<T> coder) {
super(address.getId(), namespace.stringKey(), ListCoder.of(coder),
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index ef6c3b2..3d38f88 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -26,7 +26,6 @@ import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -53,7 +52,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
* Ignore index of key and namespace.
* Just implement BagState.
*/
-public class FlinkSplitStateInternals<K> implements StateInternals<K> {
+public class FlinkSplitStateInternals<K> implements StateInternals {
private final OperatorStateBackend stateBackend;
@@ -69,7 +68,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address) {
+ StateTag<T> address) {
return state(namespace, address, StateContexts.nullContext());
}
@@ -77,36 +76,36 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address,
+ StateTag<T> address,
final StateContext<?> context) {
return address.bind(
- new StateTag.StateBinder<K>() {
+ new StateTag.StateBinder() {
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", ValueState.class.getSimpleName()));
}
@Override
public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ StateTag<BagState<T>> address, Coder<T> elemCoder) {
return new FlinkSplitBagState<>(stateBackend, address, namespace, elemCoder);
}
@Override
public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ StateTag<SetState<T>> address, Coder<T> elemCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", SetState.class.getSimpleName()));
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
throw new UnsupportedOperationException(
@@ -116,7 +115,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
throw new UnsupportedOperationException("bindCombiningValue is not supported.");
@@ -125,7 +124,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
throw new UnsupportedOperationException(
@@ -133,8 +132,8 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
throw new UnsupportedOperationException(
String.format("%s is not supported", CombiningState.class.getSimpleName()));
@@ -147,11 +146,11 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
private final ListStateDescriptor<T> descriptor;
private OperatorStateBackend flinkStateBackend;
private final StateNamespace namespace;
- private final StateTag<? super K, BagState<T>> address;
+ private final StateTag<BagState<T>> address;
FlinkSplitBagState(
OperatorStateBackend flinkStateBackend,
- StateTag<? super K, BagState<T>> address,
+ StateTag<BagState<T>> address,
StateNamespace namespace,
Coder<T> coder) {
this.flinkStateBackend = flinkStateBackend;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index c99d085..c033be6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -57,7 +57,7 @@ import org.joda.time.Instant;
* <p>Note: In the Flink streaming runner the key is always encoded
* using an {@link Coder} and stored in a {@link ByteBuffer}.
*/
-public class FlinkStateInternals<K> implements StateInternals<K> {
+public class FlinkStateInternals<K> implements StateInternals {
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
private Coder<K> keyCoder;
@@ -95,7 +95,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address) {
+ StateTag<T> address) {
return state(namespace, address, StateContexts.nullContext());
}
@@ -103,36 +103,36 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address,
+ StateTag<T> address,
final StateContext<?> context) {
return address.bind(
- new StateTag.StateBinder<K>() {
+ new StateTag.StateBinder() {
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
return new FlinkValueState<>(flinkStateBackend, address, namespace, coder);
}
@Override
public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ StateTag<BagState<T>> address, Coder<T> elemCoder) {
return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder);
}
@Override
public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ StateTag<SetState<T>> address, Coder<T> elemCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", SetState.class.getSimpleName()));
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
throw new UnsupportedOperationException(
@@ -142,7 +142,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
@@ -153,7 +153,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return new FlinkCombiningStateWithContext<>(
@@ -167,8 +167,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
return new FlinkWatermarkHoldState<>(
@@ -180,13 +180,13 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
private static class FlinkValueState<K, T> implements ValueState<T> {
private final StateNamespace namespace;
- private final StateTag<? super K, ValueState<T>> address;
+ private final StateTag<ValueState<T>> address;
private final ValueStateDescriptor<T> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
FlinkValueState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, ValueState<T>> address,
+ StateTag<ValueState<T>> address,
StateNamespace namespace,
Coder<T> coder) {
@@ -266,13 +266,13 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
private static class FlinkBagState<K, T> implements BagState<T> {
private final StateNamespace namespace;
- private final StateTag<? super K, BagState<T>> address;
+ private final StateTag<BagState<T>> address;
private final ListStateDescriptor<T> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
FlinkBagState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, BagState<T>> address,
+ StateTag<BagState<T>> address,
StateNamespace namespace,
Coder<T> coder) {
@@ -379,14 +379,14 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
FlinkCombiningState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder) {
@@ -547,7 +547,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
@@ -555,7 +555,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
FlinkKeyedCombiningState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
@@ -718,7 +718,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
@@ -727,7 +727,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
FlinkCombiningStateWithContext(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
@@ -886,7 +886,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
implements WatermarkHoldState {
- private final StateTag<? super K, WatermarkHoldState> address;
+ private final StateTag<WatermarkHoldState> address;
private final TimestampCombiner timestampCombiner;
private final StateNamespace namespace;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
@@ -896,7 +896,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
public FlinkWatermarkHoldState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
FlinkStateInternals<K> flinkStateInternals,
- StateTag<? super K, WatermarkHoldState> address,
+ StateTag<WatermarkHoldState> address,
StateNamespace namespace,
TimestampCombiner timestampCombiner) {
this.address = address;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 4e18ac2..bda30e4 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -196,7 +196,7 @@ public class DoFnOperatorTest {
DoFn<Integer, String> fn = new DoFn<Integer, String>() {
@StateId("state")
- private final StateSpec<Object, ValueState<String>> stateSpec =
+ private final StateSpec<ValueState<String>> stateSpec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
@@ -296,7 +296,7 @@ public class DoFnOperatorTest {
private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId(stateId)
- private final StateSpec<Object, ValueState<String>> stateSpec =
+ private final StateSpec<ValueState<String>> stateSpec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index 7e7d1e1..eb2c05f 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -56,12 +56,12 @@ public class FlinkBroadcastStateInternalsTest {
private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
- private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+ private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+ private static final StateTag<CombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ private static final StateTag<BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
FlinkBroadcastStateInternals<String> underTest;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
index 5433d07..0e0267b 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
@@ -64,7 +64,7 @@ public class FlinkKeyGroupStateInternalsTest {
private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ private static final StateTag<BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
FlinkKeyGroupStateInternals<String> underTest;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
index 08ae0c4..8033a9d 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
@@ -47,7 +47,7 @@ public class FlinkSplitStateInternalsTest {
private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ private static final StateTag<BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
FlinkSplitStateInternals<String> underTest;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 17c43bf..cd00d9e 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -70,18 +70,18 @@ public class FlinkStateInternalsTest {
private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
- private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+ private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+ private static final StateTag<CombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ private static final StateTag<BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
FlinkStateInternals<String> underTest;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
index ce7f678..38129ab 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -103,7 +103,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
private static class DummyStatefulDoFn extends DoFn<KV<Integer, Integer>, Integer> {
@StateId("foo")
- private final StateSpec<Object, ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of());
+ private final StateSpec<ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void processElem(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 343d51b..63e1166 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -890,7 +890,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
ParDo.of(
new DoFn<KV<Integer, Integer>, Integer>() {
@StateId("unused")
- final StateSpec<Object, ValueState<Integer>> stateSpec =
+ final StateSpec<ValueState<Integer>> stateSpec =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index cdc23ff..afaba3a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
@@ -51,7 +50,7 @@ import org.joda.time.Instant;
/**
* An implementation of {@link StateInternals} for the SparkRunner.
*/
-class SparkStateInternals<K> implements StateInternals<K> {
+class SparkStateInternals<K> implements StateInternals {
private final K key;
//Serializable state for internals (namespace to state tag to coded value).
@@ -86,50 +85,47 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
@Override
- public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+ public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
return state(namespace, address, StateContexts.nullContext());
}
@Override
public <T extends State> T state(
StateNamespace namespace,
- StateTag<? super K, T> address,
+ StateTag<T> address,
StateContext<?> c) {
- return address.bind(new SparkStateBinder(key, namespace, c));
+ return address.bind(new SparkStateBinder(namespace, c));
}
- private class SparkStateBinder implements StateBinder<K> {
- private final K key;
+ private class SparkStateBinder implements StateBinder {
private final StateNamespace namespace;
private final StateContext<?> c;
- private SparkStateBinder(K key,
- StateNamespace namespace,
+ private SparkStateBinder(StateNamespace namespace,
StateContext<?> c) {
- this.key = key;
this.namespace = namespace;
this.c = c;
}
@Override
- public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
return new SparkValueState<>(namespace, address, coder);
}
@Override
- public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
return new SparkBagState<>(namespace, address, elemCoder);
}
@Override
- public <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> elemCoder) {
+ public <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", SetState.class.getSimpleName()));
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", MapState.class.getSimpleName()));
@@ -138,7 +134,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
return new SparkCombiningState<>(namespace, address, accumCoder, combineFn);
@@ -147,7 +143,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return new SparkCombiningState<>(
@@ -155,8 +151,8 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
return new SparkWatermarkHoldState(namespace, address, timestampCombiner);
}
@@ -164,12 +160,12 @@ class SparkStateInternals<K> implements StateInternals<K> {
private class AbstractState<T> {
final StateNamespace namespace;
- final StateTag<?, ? extends State> address;
+ final StateTag<? extends State> address;
final Coder<T> coder;
private AbstractState(
StateNamespace namespace,
- StateTag<?, ? extends State> address,
+ StateTag<? extends State> address,
Coder<T> coder) {
this.namespace = namespace;
this.address = address;
@@ -218,7 +214,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
private SparkValueState(
StateNamespace namespace,
- StateTag<?, ValueState<T>> address,
+ StateTag<ValueState<T>> address,
Coder<T> coder) {
super(namespace, address, coder);
}
@@ -246,7 +242,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
public SparkWatermarkHoldState(
StateNamespace namespace,
- StateTag<?, WatermarkHoldState> address,
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
super(namespace, address, InstantCoder.of());
this.timestampCombiner = timestampCombiner;
@@ -300,7 +296,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
private SparkCombiningState(
StateNamespace namespace,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> coder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
super(namespace, address, coder);
@@ -363,7 +359,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
private final class SparkBagState<T> extends AbstractState<List<T>> implements BagState<T> {
private SparkBagState(
StateNamespace namespace,
- StateTag<?, BagState<T>> address,
+ StateTag<BagState<T>> address,
Coder<T> coder) {
super(namespace, address, ListCoder.of(coder));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index 0a00c45..063feef 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -93,7 +93,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
timerInternals.advanceProcessingTime(Instant.now());
timerInternals.advanceSynchronizedProcessingTime(Instant.now());
- StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+ StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
GABWOutputWindowedValue<K, InputT> outputter = new GABWOutputWindowedValue<>();
ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner =
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 3e8dde5..ffe343b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -124,7 +124,7 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
Coder<W> windowCoder) throws IOException { }
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
return null;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index ef1ff9f..7b6f9ed 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -65,7 +65,7 @@ public final class TranslationUtils {
*/
static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>, Serializable {
@Override
- public StateInternals<K> stateInternalsForKey(K key) {
+ public StateInternals stateInternalsForKey(K key) {
return InMemoryStateInternals.forKey(key);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 9b99ca4..0368476 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -331,10 +331,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
/**
* Annotation for declaring and dereferencing state cells.
*
- * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a {@link
- * StateId}. To use the cell during processing, add a parameter of the appropriate {@link State}
- * subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} method, and
- * annotate it with {@link StateId}. See the following code for an example:
+ * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a
+ * {@link StateId}. To use the cell during processing, add a parameter of the appropriate {@link
+ * State} subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer}
+ * method, and annotate it with {@link StateId}. See the following code for an example:
*
* <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} {
*
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index b5547e3..02f3a85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -112,14 +112,14 @@ public class GroupIntoBatches<K, InputT>
private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId(BATCH_ID)
- private final StateSpec<Object, BagState<InputT>> batchSpec;
+ private final StateSpec<BagState<InputT>> batchSpec;
@StateId(NUM_ELEMENTS_IN_BATCH_ID)
- private final StateSpec<Object, CombiningState<Long, Long, Long>>
+ private final StateSpec<CombiningState<Long, Long, Long>>
numElementsInBatchSpec;
@StateId(KEY_ID)
- private final StateSpec<Object, ValueState<K>> keySpec;
+ private final StateSpec<ValueState<K>> keySpec;
private final long prefetchFrequency;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 1f6afbf..6137a7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -446,7 +446,7 @@ public class ParDo {
Map<String, DoFnSignature.StateDeclaration> stateDeclarations = signature.stateDeclarations();
for (DoFnSignature.StateDeclaration stateDeclaration : stateDeclarations.values()) {
try {
- StateSpec<?, ?> stateSpec = (StateSpec<?, ?>) stateDeclaration.field().get(fn);
+ StateSpec<?> stateSpec = (StateSpec<?>) stateDeclaration.field().get(fn);
stateSpec.offerCoders(codersForStateSpecTypes(stateDeclaration, coderRegistry, inputCoder));
stateSpec.finishSpecifying();
} catch (IllegalAccessException e) {
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index 6fe37a1..48fa742 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -20,34 +20,36 @@ package org.apache.beam.sdk.util.state;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
/**
* Visitor for binding a {@link StateSpec} and to the associated {@link State}.
- *
- * @param <K> the type of key this binder embodies.
*/
-public interface StateBinder<K> {
- <T> ValueState<T> bindValue(String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder);
+public interface StateBinder {
+ <T> ValueState<T> bindValue(
+ String id, StateSpec<ValueState<T>> spec, Coder<T> coder);
- <T> BagState<T> bindBag(String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder);
+ <T> BagState<T> bindBag(
+ String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder);
- <T> SetState<T> bindSet(String id, StateSpec<? super K, SetState<T>> spec, Coder<T> elemCoder);
+ <T> SetState<T> bindSet(
+ String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder);
<KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
- Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
+ String id,
+ StateSpec<MapState<KeyT, ValueT>> spec,
+ Coder<KeyT> mapKeyCoder,
+ Coder<ValueT> mapValueCoder);
<InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(
String id,
- StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
String id,
- StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
@@ -57,8 +59,8 @@ public interface StateBinder<K> {
* <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added
* to the returned {@link WatermarkHoldState} are to be combined.
*/
- <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ WatermarkHoldState bindWatermark(
String id,
- StateSpec<? super K, WatermarkHoldState> spec,
+ StateSpec<WatermarkHoldState> spec,
TimestampCombiner timestampCombiner);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
index 6b94c40..8eda218 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
@@ -26,23 +26,22 @@ import org.apache.beam.sdk.coders.Coder;
* A specification of a persistent state cell. This includes information necessary to encode the
* value and details about the intended access pattern.
*
- * @param <K> The type of key that must be used with the state tag. Contravariant: methods should
- * accept values of type {@code StateSpec<? super K, StateT>}.
* @param <StateT> The type of state being described.
*/
@Experimental(Kind.STATE)
-public interface StateSpec<K, StateT extends State> extends Serializable {
+public interface StateSpec<StateT extends State> extends Serializable {
/**
* Use the {@code binder} to create an instance of {@code StateT} appropriate for this address.
*/
- StateT bind(String id, StateBinder<? extends K> binder);
+ StateT bind(String id, StateBinder binder);
/**
- * Given {code coders} are inferred from type arguments defined for this class.
- * Coders which are already set should take precedence over offered coders.
- * @param coders Array of coders indexed by the type arguments order. Entries might be null if
- * the coder could not be inferred.
+ * Given {code coders} are inferred from type arguments defined for this class. Coders which are
+ * already set should take precedence over offered coders.
+ *
+ * @param coders Array of coders indexed by the type arguments order. Entries might be null if the
+ * coder could not be inferred.
*/
void offerCoders(Coder[] coders);
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index a057a0b..49d5722 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
/**
@@ -42,12 +41,12 @@ public class StateSpecs {
private StateSpecs() {}
/** Create a simple state spec for values of type {@code T}. */
- public static <T> StateSpec<Object, ValueState<T>> value() {
+ public static <T> StateSpec<ValueState<T>> value() {
return new ValueStateSpec<>(null);
}
/** Create a simple state spec for values of type {@code T}. */
- public static <T> StateSpec<Object, ValueState<T>> value(Coder<T> valueCoder) {
+ public static <T> StateSpec<ValueState<T>> value(Coder<T> valueCoder) {
checkArgument(valueCoder != null, "valueCoder should not be null. Consider value() instead");
return new ValueStateSpec<>(valueCoder);
}
@@ -57,17 +56,27 @@ public class StateSpecs {
* {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
CombineFn<InputT, AccumT, OutputT> combineFn) {
return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn);
}
/**
+ * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge
+ * multiple {@code InputT}s into a single {@code OutputT}.
+ */
+ public static <InputT, AccumT, OutputT>
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(null, combineFn);
+ }
+
+ /**
* Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
* {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
checkArgument(accumCoder != null,
"accumCoder should not be null. "
@@ -80,11 +89,8 @@ public class StateSpecs {
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
- checkArgument(accumCoder != null,
- "accumCoder should not be null. "
- + "Consider using combining(CombineFn<> combineFn) instead.");
return combiningInternal(accumCoder, combineFn);
}
@@ -96,7 +102,7 @@ public class StateSpecs {
* only be used to initialize static values.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>>
+ StateSpec<CombiningState<InputT, AccumT, OutputT>>
combiningFromInputInternal(
Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
try {
@@ -113,13 +119,13 @@ public class StateSpecs {
}
private static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
}
private static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
}
@@ -128,7 +134,7 @@ public class StateSpecs {
* Create a state spec that is optimized for adding values frequently, and occasionally retrieving
* all the values that have been added.
*/
- public static <T> StateSpec<Object, BagState<T>> bag() {
+ public static <T> StateSpec<BagState<T>> bag() {
return bag(null);
}
@@ -136,49 +142,46 @@ public class StateSpecs {
* Create a state spec that is optimized for adding values frequently, and occasionally retrieving
* all the values that have been added.
*/
- public static <T> StateSpec<Object, BagState<T>> bag(Coder<T> elemCoder) {
+ public static <T> StateSpec<BagState<T>> bag(Coder<T> elemCoder) {
return new BagStateSpec<>(elemCoder);
}
/**
* Create a state spec that supporting for {@link java.util.Set} like access patterns.
*/
- public static <T> StateSpec<Object, SetState<T>> set() {
+ public static <T> StateSpec<SetState<T>> set() {
return set(null);
}
/**
* Create a state spec that supporting for {@link java.util.Set} like access patterns.
*/
- public static <T> StateSpec<Object, SetState<T>> set(Coder<T> elemCoder) {
+ public static <T> StateSpec<SetState<T>> set(Coder<T> elemCoder) {
return new SetStateSpec<>(elemCoder);
}
/**
* Create a state spec that supporting for {@link java.util.Map} like access patterns.
*/
- public static <K, V> StateSpec<Object, MapState<K, V>> map() {
+ public static <K, V> StateSpec<MapState<K, V>> map() {
return new MapStateSpec<>(null, null);
}
- /**
- * Create a state spec that supporting for {@link java.util.Map} like access patterns.
- */
- public static <K, V> StateSpec<Object, MapState<K, V>> map(Coder<K> keyCoder,
- Coder<V> valueCoder) {
+ /** Create a state spec that supporting for {@link java.util.Map} like access patterns. */
+ public static <K, V> StateSpec<MapState<K, V>> map(Coder<K> keyCoder, Coder<V> valueCoder) {
return new MapStateSpec<>(keyCoder, valueCoder);
}
/** Create a state spec for holding the watermark. */
- public static <W extends BoundedWindow>
- StateSpec<Object, WatermarkHoldState> watermarkStateInternal(
+ public static
+ StateSpec<WatermarkHoldState> watermarkStateInternal(
TimestampCombiner timestampCombiner) {
- return new WatermarkStateSpecInternal<W>(timestampCombiner);
+ return new WatermarkStateSpecInternal(timestampCombiner);
}
- public static <K, InputT, AccumT, OutputT>
- StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
- StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
+ public static <InputT, AccumT, OutputT>
+ StateSpec<BagState<AccumT>> convertToBagSpecInternal(
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
if (combiningSpec instanceof CombiningStateSpec) {
// Checked above; conversion to a bag spec depends on the provided spec being one of those
// created via the factory methods in this class.
@@ -201,7 +204,7 @@ public class StateSpecs {
*
* <p>Includes the coder for {@code T}.
*/
- private static class ValueStateSpec<T> implements StateSpec<Object, ValueState<T>> {
+ private static class ValueStateSpec<T> implements StateSpec<ValueState<T>> {
@Nullable
private Coder<T> coder;
@@ -211,7 +214,7 @@ public class StateSpecs {
}
@Override
- public ValueState<T> bind(String id, StateBinder<?> visitor) {
+ public ValueState<T> bind(String id, StateBinder visitor) {
return visitor.bindValue(id, this, coder);
}
@@ -260,7 +263,7 @@ public class StateSpecs {
* <p>Includes the {@link CombineFn} and the coder for the accumulator type.
*/
private static class CombiningStateSpec<InputT, AccumT, OutputT>
- implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
+ implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
@@ -275,7 +278,7 @@ public class StateSpecs {
@Override
public CombiningState<InputT, AccumT, OutputT> bind(
- String id, StateBinder<? extends Object> visitor) {
+ String id, StateBinder visitor) {
return visitor.bindCombining(id, this, accumCoder, combineFn);
}
@@ -320,7 +323,7 @@ public class StateSpecs {
return Objects.hash(getClass(), accumCoder);
}
- private StateSpec<Object, BagState<AccumT>> asBagSpec() {
+ private StateSpec<BagState<AccumT>> asBagSpec() {
return new BagStateSpec<AccumT>(accumCoder);
}
}
@@ -332,7 +335,7 @@ public class StateSpecs {
* <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type.
*/
private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT>
- implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
+ implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
@Nullable private Coder<AccumT> accumCoder;
private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
@@ -346,7 +349,7 @@ public class StateSpecs {
@Override
public CombiningState<InputT, AccumT, OutputT> bind(
- String id, StateBinder<? extends Object> visitor) {
+ String id, StateBinder visitor) {
return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn);
}
@@ -392,7 +395,7 @@ public class StateSpecs {
return Objects.hash(getClass(), accumCoder);
}
- private StateSpec<Object, BagState<AccumT>> asBagSpec() {
+ private StateSpec<BagState<AccumT>> asBagSpec() {
return new BagStateSpec<AccumT>(accumCoder);
}
}
@@ -403,7 +406,7 @@ public class StateSpecs {
*
* <p>Includes the coder for the element type {@code T}</p>
*/
- private static class BagStateSpec<T> implements StateSpec<Object, BagState<T>> {
+ private static class BagStateSpec<T> implements StateSpec<BagState<T>> {
@Nullable
private Coder<T> elemCoder;
@@ -413,7 +416,7 @@ public class StateSpecs {
}
@Override
- public BagState<T> bind(String id, StateBinder<?> visitor) {
+ public BagState<T> bind(String id, StateBinder visitor) {
return visitor.bindBag(id, this, elemCoder);
}
@@ -456,7 +459,7 @@ public class StateSpecs {
}
}
- private static class MapStateSpec<K, V> implements StateSpec<Object, MapState<K, V>> {
+ private static class MapStateSpec<K, V> implements StateSpec<MapState<K, V>> {
@Nullable
private Coder<K> keyCoder;
@@ -469,7 +472,7 @@ public class StateSpecs {
}
@Override
- public MapState<K, V> bind(String id, StateBinder<?> visitor) {
+ public MapState<K, V> bind(String id, StateBinder visitor) {
return visitor.bindMap(id, this, keyCoder, valueCoder);
}
@@ -523,7 +526,7 @@ public class StateSpecs {
*
* <p>Includes the coder for the element type {@code T}</p>
*/
- private static class SetStateSpec<T> implements StateSpec<Object, SetState<T>> {
+ private static class SetStateSpec<T> implements StateSpec<SetState<T>> {
@Nullable
private Coder<T> elemCoder;
@@ -533,7 +536,7 @@ public class StateSpecs {
}
@Override
- public SetState<T> bind(String id, StateBinder<?> visitor) {
+ public SetState<T> bind(String id, StateBinder visitor) {
return visitor.bindSet(id, this, elemCoder);
}
@@ -582,8 +585,7 @@ public class StateSpecs {
* <p>Includes the {@link TimestampCombiner} according to which the output times
* are combined.
*/
- private static class WatermarkStateSpecInternal<W extends BoundedWindow>
- implements StateSpec<Object, WatermarkHoldState> {
+ private static class WatermarkStateSpecInternal implements StateSpec<WatermarkHoldState> {
/**
* When multiple output times are added to hold the watermark, this determines how they are
@@ -597,7 +599,7 @@ public class StateSpecs {
}
@Override
- public WatermarkHoldState bind(String id, StateBinder<?> visitor) {
+ public WatermarkHoldState bind(String id, StateBinder visitor) {
return visitor.bindWatermark(id, this, timestampCombiner);
}
[6/6] beam git commit: This closes #2627: Remove extraneous type
variable from StateSpec
Posted by ke...@apache.org.
This closes #2627: Remove extraneous type variable from StateSpec
Make DoFnSignatures robust to StateSpec subclasses
Simplify type parameters of StateSpec and related
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b40b2650
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b40b2650
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b40b2650
Branch: refs/heads/master
Commit: b40b26501805fd6eabc1a5d9ffe60baa4a989bc7
Parents: eec903f 190422c
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 2 11:55:43 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 2 11:55:43 2017 -0700
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 4 +-
.../translation/utils/ApexStateInternals.java | 42 +++--
.../apex/translation/utils/NoOpStepContext.java | 2 +-
.../translation/utils/StateInternalsProxy.java | 6 +-
.../utils/ApexStateInternalsTest.java | 12 +-
.../construction/PTransformMatchersTest.java | 2 +-
.../beam/runners/core/BaseExecutionContext.java | 2 +-
.../beam/runners/core/ExecutionContext.java | 2 +-
.../GroupAlsoByWindowViaOutputBufferDoFn.java | 2 +-
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 2 +-
.../GroupAlsoByWindowViaWindowSetNewDoFn.java | 2 +-
.../runners/core/InMemoryStateInternals.java | 36 ++--
.../runners/core/MergingActiveWindowSet.java | 4 +-
.../beam/runners/core/MergingStateAccessor.java | 2 +-
.../apache/beam/runners/core/NonEmptyPanes.java | 2 +-
.../beam/runners/core/PaneInfoTracker.java | 2 +-
.../runners/core/ReduceFnContextFactory.java | 37 ++--
.../beam/runners/core/ReduceFnRunner.java | 4 +-
.../beam/runners/core/SideInputHandler.java | 14 +-
.../beam/runners/core/SimpleDoFnRunner.java | 8 +-
.../beam/runners/core/SimpleOldDoFnRunner.java | 2 +-
.../beam/runners/core/SplittableParDo.java | 8 +-
.../apache/beam/runners/core/StateAccessor.java | 2 +-
.../beam/runners/core/StateInternals.java | 8 +-
.../runners/core/StateInternalsFactory.java | 2 +-
.../apache/beam/runners/core/StateMerging.java | 16 +-
.../apache/beam/runners/core/StateTable.java | 10 +-
.../org/apache/beam/runners/core/StateTag.java | 28 ++-
.../org/apache/beam/runners/core/StateTags.java | 70 ++++----
.../beam/runners/core/StatefulDoFnRunner.java | 6 +-
.../beam/runners/core/SystemReduceFn.java | 8 +-
.../core/TestInMemoryStateInternals.java | 6 +-
.../apache/beam/runners/core/WatermarkHold.java | 8 +-
.../beam/runners/core/WindowingInternals.java | 2 +-
.../AfterDelayFromFirstElementStateMachine.java | 2 +-
.../core/triggers/AfterPaneStateMachine.java | 2 +-
.../TriggerStateMachineContextFactory.java | 12 +-
.../triggers/TriggerStateMachineRunner.java | 2 +-
.../core/GroupAlsoByWindowsProperties.java | 10 +-
.../core/InMemoryStateInternalsTest.java | 16 +-
.../core/MergingActiveWindowSetTest.java | 2 +-
.../beam/runners/core/ReduceFnTester.java | 18 +-
.../beam/runners/core/SplittableParDoTest.java | 2 +-
.../apache/beam/runners/core/StateTagTest.java | 62 +++----
.../runners/core/StatefulDoFnRunnerTest.java | 4 +-
.../CopyOnAccessInMemoryStateInternals.java | 118 ++++++-------
.../runners/direct/DirectExecutionContext.java | 15 +-
.../beam/runners/direct/EvaluationContext.java | 8 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 6 +-
.../beam/runners/direct/ParDoEvaluator.java | 2 +-
...littableProcessElementsEvaluatorFactory.java | 2 +-
.../direct/StatefulParDoEvaluatorFactory.java | 5 +-
.../runners/direct/StepTransformResult.java | 4 +-
.../beam/runners/direct/TransformResult.java | 2 +-
.../CopyOnAccessInMemoryStateInternalsTest.java | 106 +++++------
.../beam/runners/direct/DirectRunnerTest.java | 1 +
.../runners/direct/EvaluationContextTest.java | 12 +-
.../StatefulParDoEvaluatorFactoryTest.java | 10 +-
.../functions/FlinkNoOpStepContext.java | 2 +-
.../functions/FlinkStatefulDoFnFunction.java | 2 +-
.../wrappers/streaming/DoFnOperator.java | 6 +-
.../streaming/SplittableDoFnOperator.java | 4 +-
.../wrappers/streaming/WindowDoFnOperator.java | 4 +-
.../state/FlinkBroadcastStateInternals.java | 45 +++--
.../state/FlinkKeyGroupStateInternals.java | 29 ++-
.../state/FlinkSplitStateInternals.java | 29 ++-
.../streaming/state/FlinkStateInternals.java | 48 ++---
.../flink/streaming/DoFnOperatorTest.java | 4 +-
.../FlinkBroadcastStateInternalsTest.java | 6 +-
.../FlinkKeyGroupStateInternalsTest.java | 2 +-
.../streaming/FlinkSplitStateInternalsTest.java | 2 +-
.../streaming/FlinkStateInternalsTest.java | 12 +-
.../BatchStatefulParDoOverridesTest.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 2 +-
.../spark/stateful/SparkStateInternals.java | 44 +++--
...SparkGroupAlsoByWindowViaOutputBufferFn.java | 2 +-
.../spark/translation/SparkProcessContext.java | 2 +-
.../spark/translation/TranslationUtils.java | 2 +-
.../org/apache/beam/sdk/transforms/DoFn.java | 8 +-
.../beam/sdk/transforms/GroupIntoBatches.java | 6 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 2 +-
.../sdk/transforms/reflect/DoFnSignatures.java | 25 ++-
.../apache/beam/sdk/util/state/StateBinder.java | 28 +--
.../apache/beam/sdk/util/state/StateSpec.java | 15 +-
.../apache/beam/sdk/util/state/StateSpecs.java | 92 +++++-----
.../apache/beam/sdk/transforms/ParDoTest.java | 177 +++++++++----------
.../transforms/reflect/DoFnInvokersTest.java | 2 +-
.../transforms/reflect/DoFnSignaturesTest.java | 26 +--
.../beam/fn/harness/fake/FakeStepContext.java | 2 +-
89 files changed, 711 insertions(+), 707 deletions(-)
----------------------------------------------------------------------
[4/6] beam git commit: Simplify type parameters of StateSpec and
related
Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
index 315110d..a056937 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
@@ -50,11 +50,11 @@ import org.joda.time.Instant;
public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
private final WindowFn<?, W> windowFn;
- private StateInternals<?> stateInternals;
+ private StateInternals stateInternals;
private final Coder<W> windowCoder;
- public TriggerStateMachineContextFactory(WindowFn<?, W> windowFn,
- StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) {
+ public TriggerStateMachineContextFactory(
+ WindowFn<?, W> windowFn, StateInternals stateInternals, ActiveWindowSet<W> activeWindows) {
// Future triggers may be able to exploit the active window to state address window mapping.
this.windowFn = windowFn;
this.stateInternals = stateInternals;
@@ -263,7 +263,7 @@ public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
}
@Override
- public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) {
+ public <StateT extends State> StateT access(StateTag<StateT> address) {
return stateInternals.state(windowNamespace, address);
}
}
@@ -280,13 +280,13 @@ public class TriggerStateMachineContextFactory<W extends BoundedWindow> {
@Override
public <StateT extends State> StateT access(
- StateTag<? super Object, StateT> address) {
+ StateTag<StateT> address) {
return stateInternals.state(windowNamespace, address);
}
@Override
public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
- StateTag<? super Object, StateT> address) {
+ StateTag<StateT> address) {
ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
for (W mergingWindow : activeToBeMerged) {
StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address);
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index e26241a..fc2f696 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -57,7 +57,7 @@ import org.joda.time.Instant;
*/
public class TriggerStateMachineRunner<W extends BoundedWindow> {
@VisibleForTesting
- public static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
+ public static final StateTag<ValueState<BitSet>> FINISHED_BITS_TAG =
StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
private final ExecutableTriggerStateMachine rootTrigger;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index bc33366..054a2e2 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -570,7 +570,7 @@ public class GroupAlsoByWindowsProperties {
}
private static final class CachingStateInternalsFactory<K> implements StateInternalsFactory<K> {
- private final LoadingCache<K, StateInternals<K>> stateInternalsCache;
+ private final LoadingCache<K, StateInternals> stateInternalsCache;
private CachingStateInternalsFactory() {
this.stateInternalsCache = CacheBuilder.newBuilder().build(new StateInternalsLoader<K>());
@@ -578,7 +578,7 @@ public class GroupAlsoByWindowsProperties {
@Override
@SuppressWarnings("unchecked")
- public StateInternals<K> stateInternalsForKey(K key) {
+ public StateInternals stateInternalsForKey(K key) {
try {
return stateInternalsCache.get(key);
} catch (Exception exc) {
@@ -587,9 +587,9 @@ public class GroupAlsoByWindowsProperties {
}
}
- private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals<K>> {
+ private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals> {
@Override
- public StateInternals<K> load(K key) throws Exception {
+ public StateInternals load(K key) throws Exception {
return InMemoryStateInternals.forKey(key);
}
}
@@ -686,7 +686,7 @@ public class GroupAlsoByWindowsProperties {
}
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 6248401..16f7f26 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -60,22 +60,22 @@ public class InMemoryStateInternalsTest {
private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
- private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+ private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+ private static final StateTag<CombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ private static final StateTag<BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, SetState<String>> STRING_SET_ADDR =
+ private static final StateTag<SetState<String>> STRING_SET_ADDR =
StateTags.set("stringSet", StringUtf8Coder.of());
- private static final StateTag<Object, MapState<String, Integer>> STRING_MAP_ADDR =
+ private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR =
StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey");
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
index 95d6977..7a83a18 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
@@ -45,7 +45,7 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class MergingActiveWindowSetTest {
private Sessions windowFn;
- private StateInternals<String> state;
+ private StateInternals state;
private MergingActiveWindowSet<IntervalWindow> set;
private ActiveWindowSet.MergeCallback<IntervalWindow> callback;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index eba0f67..573855f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -314,14 +314,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) {
assertHasOnlyGlobalAndAllowedTags(
ImmutableSet.copyOf(expectedWindows),
- ImmutableSet.<StateTag<? super String, ?>>of(TriggerStateMachineRunner.FINISHED_BITS_TAG));
+ ImmutableSet.<StateTag<?>>of(TriggerStateMachineRunner.FINISHED_BITS_TAG));
}
@SafeVarargs
public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expectedWindows) {
assertHasOnlyGlobalAndAllowedTags(
ImmutableSet.copyOf(expectedWindows),
- ImmutableSet.<StateTag<? super String, ?>>of(
+ ImmutableSet.<StateTag<?>>of(
TriggerStateMachineRunner.FINISHED_BITS_TAG,
PaneInfoTracker.PANE_INFO_TAG,
WatermarkHold.watermarkHoldTagForTimestampCombiner(
@@ -331,14 +331,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public final void assertHasOnlyGlobalState() {
assertHasOnlyGlobalAndAllowedTags(
- Collections.<W>emptySet(), Collections.<StateTag<? super String, ?>>emptySet());
+ Collections.<W>emptySet(), Collections.<StateTag<?>>emptySet());
}
@SafeVarargs
public final void assertHasOnlyGlobalAndPaneInfoFor(W... expectedWindows) {
assertHasOnlyGlobalAndAllowedTags(
ImmutableSet.copyOf(expectedWindows),
- ImmutableSet.<StateTag<? super String, ?>>of(
+ ImmutableSet.<StateTag<?>>of(
PaneInfoTracker.PANE_INFO_TAG,
WatermarkHold.watermarkHoldTagForTimestampCombiner(
objectStrategy.getTimestampCombiner()),
@@ -350,30 +350,30 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
* {@code expectedWindows} and that each of these windows has only tags from {@code allowedTags}.
*/
private void assertHasOnlyGlobalAndAllowedTags(
- Set<W> expectedWindows, Set<StateTag<? super String, ?>> allowedTags) {
+ Set<W> expectedWindows, Set<StateTag<?>> allowedTags) {
Set<StateNamespace> expectedWindowsSet = new HashSet<>();
for (W expectedWindow : expectedWindows) {
expectedWindowsSet.add(windowNamespace(expectedWindow));
}
- Map<StateNamespace, Set<StateTag<? super String, ?>>> actualWindows = new HashMap<>();
+ Map<StateNamespace, Set<StateTag<?>>> actualWindows = new HashMap<>();
for (StateNamespace namespace : stateInternals.getNamespacesInUse()) {
if (namespace instanceof StateNamespaces.GlobalNamespace) {
continue;
} else if (namespace instanceof StateNamespaces.WindowNamespace) {
- Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace);
+ Set<StateTag<?>> tagsInUse = stateInternals.getTagsInUse(namespace);
if (tagsInUse.isEmpty()) {
continue;
}
actualWindows.put(namespace, tagsInUse);
- Set<StateTag<? super String, ?>> unexpected = Sets.difference(tagsInUse, allowedTags);
+ Set<StateTag<?>> unexpected = Sets.difference(tagsInUse, allowedTags);
if (unexpected.isEmpty()) {
continue;
} else {
fail(namespace + " has unexpected states: " + tagsInUse);
}
} else if (namespace instanceof StateNamespaces.WindowAndTriggerNamespace) {
- Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace);
+ Set<StateTag<?>> tagsInUse = stateInternals.getTagsInUse(namespace);
assertTrue(namespace + " contains " + tagsInUse, tagsInUse.isEmpty());
} else {
fail("Unrecognized namespace " + namespace);
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 1a44453..a67db6d 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -231,7 +231,7 @@ public class SplittableParDoTest {
processFn.setStateInternalsFactory(
new StateInternalsFactory<String>() {
@Override
- public StateInternals<String> stateInternalsForKey(String key) {
+ public StateInternals stateInternalsForKey(String key) {
return stateInternals;
}
});
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
index 9a8b75c..fc08dcc 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -41,10 +41,10 @@ import org.junit.runners.JUnit4;
public class StateTagTest {
@Test
public void testValueEquality() {
- StateTag<?, ?> fooVarInt1 = StateTags.value("foo", VarIntCoder.of());
- StateTag<?, ?> fooVarInt2 = StateTags.value("foo", VarIntCoder.of());
- StateTag<?, ?> fooBigEndian = StateTags.value("foo", BigEndianIntegerCoder.of());
- StateTag<?, ?> barVarInt = StateTags.value("bar", VarIntCoder.of());
+ StateTag<?> fooVarInt1 = StateTags.value("foo", VarIntCoder.of());
+ StateTag<?> fooVarInt2 = StateTags.value("foo", VarIntCoder.of());
+ StateTag<?> fooBigEndian = StateTags.value("foo", BigEndianIntegerCoder.of());
+ StateTag<?> barVarInt = StateTags.value("bar", VarIntCoder.of());
assertEquals(fooVarInt1, fooVarInt2);
assertNotEquals(fooVarInt1, fooBigEndian);
@@ -53,10 +53,10 @@ public class StateTagTest {
@Test
public void testBagEquality() {
- StateTag<?, ?> fooVarInt1 = StateTags.bag("foo", VarIntCoder.of());
- StateTag<?, ?> fooVarInt2 = StateTags.bag("foo", VarIntCoder.of());
- StateTag<?, ?> fooBigEndian = StateTags.bag("foo", BigEndianIntegerCoder.of());
- StateTag<?, ?> barVarInt = StateTags.bag("bar", VarIntCoder.of());
+ StateTag<?> fooVarInt1 = StateTags.bag("foo", VarIntCoder.of());
+ StateTag<?> fooVarInt2 = StateTags.bag("foo", VarIntCoder.of());
+ StateTag<?> fooBigEndian = StateTags.bag("foo", BigEndianIntegerCoder.of());
+ StateTag<?> barVarInt = StateTags.bag("bar", VarIntCoder.of());
assertEquals(fooVarInt1, fooVarInt2);
assertNotEquals(fooVarInt1, fooBigEndian);
@@ -65,10 +65,10 @@ public class StateTagTest {
@Test
public void testSetEquality() {
- StateTag<?, ?> fooVarInt1 = StateTags.set("foo", VarIntCoder.of());
- StateTag<?, ?> fooVarInt2 = StateTags.set("foo", VarIntCoder.of());
- StateTag<?, ?> fooBigEndian = StateTags.set("foo", BigEndianIntegerCoder.of());
- StateTag<?, ?> barVarInt = StateTags.set("bar", VarIntCoder.of());
+ StateTag<?> fooVarInt1 = StateTags.set("foo", VarIntCoder.of());
+ StateTag<?> fooVarInt2 = StateTags.set("foo", VarIntCoder.of());
+ StateTag<?> fooBigEndian = StateTags.set("foo", BigEndianIntegerCoder.of());
+ StateTag<?> barVarInt = StateTags.set("bar", VarIntCoder.of());
assertEquals(fooVarInt1, fooVarInt2);
assertNotEquals(fooVarInt1, fooBigEndian);
@@ -77,15 +77,15 @@ public class StateTagTest {
@Test
public void testMapEquality() {
- StateTag<?, ?> fooStringVarInt1 =
+ StateTag<?> fooStringVarInt1 =
StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
- StateTag<?, ?> fooStringVarInt2 =
+ StateTag<?> fooStringVarInt2 =
StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
- StateTag<?, ?> fooStringBigEndian =
+ StateTag<?> fooStringBigEndian =
StateTags.map("foo", StringUtf8Coder.of(), BigEndianIntegerCoder.of());
- StateTag<?, ?> fooVarIntBigEndian =
+ StateTag<?> fooVarIntBigEndian =
StateTags.map("foo", VarIntCoder.of(), BigEndianIntegerCoder.of());
- StateTag<?, ?> barStringVarInt =
+ StateTag<?> barStringVarInt =
StateTags.map("bar", StringUtf8Coder.of(), VarIntCoder.of());
assertEquals(fooStringVarInt1, fooStringVarInt2);
@@ -97,11 +97,11 @@ public class StateTagTest {
@Test
public void testWatermarkBagEquality() {
- StateTag<?, ?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
- StateTag<?, ?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
- StateTag<?, ?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST);
+ StateTag<?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ StateTag<?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
+ StateTag<?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST);
- StateTag<?, ?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST);
+ StateTag<?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST);
// Same id, same fn.
assertEquals(foo1, foo2);
@@ -119,12 +119,12 @@ public class StateTagTest {
Coder<Integer> input2 = BigEndianIntegerCoder.of();
Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers();
- StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
- StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
- StateTag<?, ?> fooCoder1Min = StateTags.combiningValueFromInputInternal("foo", input1, minFn);
+ StateTag<?> fooCoder1Max1 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
+ StateTag<?> fooCoder1Max2 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
+ StateTag<?> fooCoder1Min = StateTags.combiningValueFromInputInternal("foo", input1, minFn);
- StateTag<?, ?> fooCoder2Max = StateTags.combiningValueFromInputInternal("foo", input2, maxFn);
- StateTag<?, ?> barCoder1Max = StateTags.combiningValueFromInputInternal("bar", input1, maxFn);
+ StateTag<?> fooCoder2Max = StateTags.combiningValueFromInputInternal("foo", input2, maxFn);
+ StateTag<?> barCoder1Max = StateTags.combiningValueFromInputInternal("bar", input1, maxFn);
// Same name, coder and combineFn
assertEquals(fooCoder1Max1, fooCoder1Max2);
@@ -162,16 +162,16 @@ public class StateTagTest {
Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of());
Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of());
- StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueWithContext(
+ StateTag<?> fooCoder1Max1 = StateTags.combiningValueWithContext(
"foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
- StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueWithContext(
+ StateTag<?> fooCoder1Max2 = StateTags.combiningValueWithContext(
"foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
- StateTag<?, ?> fooCoder1Min = StateTags.combiningValueWithContext(
+ StateTag<?> fooCoder1Min = StateTags.combiningValueWithContext(
"foo", accum1, CombineFnUtil.toFnWithContext(minFn));
- StateTag<?, ?> fooCoder2Max = StateTags.combiningValueWithContext(
+ StateTag<?> fooCoder2Max = StateTags.combiningValueWithContext(
"foo", accum2, CombineFnUtil.toFnWithContext(maxFn));
- StateTag<?, ?> barCoder1Max = StateTags.combiningValueWithContext(
+ StateTag<?> barCoder1Max = StateTags.combiningValueWithContext(
"bar", accum1, CombineFnUtil.toFnWithContext(maxFn));
// Same name, coder and combineFn
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index aeaa63b..f80643a 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -131,7 +131,7 @@ public class StatefulDoFnRunnerTest {
timerInternals.advanceInputWatermark(new Instant(1L));
MyDoFn fn = new MyDoFn();
- StateTag<Object, ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState);
+ StateTag<ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState);
DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner(
fn,
@@ -227,7 +227,7 @@ public class StatefulDoFnRunnerTest {
public final String stateId = "foo";
@StateId(stateId)
- public final StateSpec<Object, ValueState<Integer>> intState =
+ public final StateSpec<ValueState<Integer>> intState =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index 92d87b5..ef3a053 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -60,24 +60,25 @@ import org.joda.time.Instant;
* of {@link InMemoryState}. Whenever state that exists in the underlying {@link StateTable} is
* accessed, an independent copy will be created within this table.
*/
-public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> {
- private final K key;
- private final CopyOnAccessInMemoryStateTable<K> table;
+public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals {
+ private final CopyOnAccessInMemoryStateTable table;
+
+ private K key;
/**
* Creates a new {@link CopyOnAccessInMemoryStateInternals} with the underlying (possibly null)
* StateInternals.
*/
- public static <K> CopyOnAccessInMemoryStateInternals<K> withUnderlying(
- K key, @Nullable CopyOnAccessInMemoryStateInternals<K> underlying) {
- return new CopyOnAccessInMemoryStateInternals<K>(key, underlying);
+ public static <K> CopyOnAccessInMemoryStateInternals withUnderlying(
+ K key, @Nullable CopyOnAccessInMemoryStateInternals underlying) {
+ return new CopyOnAccessInMemoryStateInternals<>(key, underlying);
}
private CopyOnAccessInMemoryStateInternals(
- K key, CopyOnAccessInMemoryStateInternals<K> underlying) {
+ K key, CopyOnAccessInMemoryStateInternals underlying) {
this.key = key;
table =
- new CopyOnAccessInMemoryStateTable<K>(key, underlying == null ? null : underlying.table);
+ new CopyOnAccessInMemoryStateTable(underlying == null ? null : underlying.table);
}
/**
@@ -94,7 +95,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
*
* @return this table
*/
- public CopyOnAccessInMemoryStateInternals<K> commit() {
+ public CopyOnAccessInMemoryStateInternals commit() {
table.commit();
return this;
}
@@ -116,18 +117,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+ public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
return state(namespace, address, StateContexts.nullContext());
}
@Override
public <T extends State> T state(
- StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
+ StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
return table.get(namespace, address, c);
}
@Override
- public K getKey() {
+ public Object getKey() {
return key;
}
@@ -140,9 +141,8 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
* {@link StateTable#get(StateNamespace, StateTag, StateContext)}, first attempts to obtain a
* copy of existing {@link State} from an underlying {@link StateTable}.
*/
- private static class CopyOnAccessInMemoryStateTable<K> extends StateTable<K> {
- private final K key;
- private Optional<StateTable<K>> underlying;
+ private static class CopyOnAccessInMemoryStateTable extends StateTable {
+ private Optional<StateTable> underlying;
/**
* The StateBinderFactory currently in use by this {@link CopyOnAccessInMemoryStateTable}.
@@ -162,17 +162,16 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
* when a {@link StateTag} is bound.</li>
* </ul>
*/
- private StateBinderFactory<K> binderFactory;
+ private StateBinderFactory binderFactory;
/**
* The earliest watermark hold in this table.
*/
private Optional<Instant> earliestWatermarkHold;
- public CopyOnAccessInMemoryStateTable(K key, StateTable<K> underlying) {
- this.key = key;
+ public CopyOnAccessInMemoryStateTable(StateTable underlying) {
this.underlying = Optional.fromNullable(underlying);
- binderFactory = new CopyOnBindBinderFactory<>(key, this.underlying);
+ binderFactory = new CopyOnBindBinderFactory(this.underlying);
earliestWatermarkHold = Optional.absent();
}
@@ -191,7 +190,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
private void commit() {
Instant earliestHold = getEarliestWatermarkHold();
if (underlying.isPresent()) {
- ReadThroughBinderFactory<K> readThroughBinder =
+ ReadThroughBinderFactory readThroughBinder =
new ReadThroughBinderFactory<>(underlying.get());
binderFactory = readThroughBinder;
Instant earliestUnderlyingHold = readThroughBinder.readThroughAndGetEarliestHold(this);
@@ -201,7 +200,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
earliestWatermarkHold = Optional.of(earliestHold);
clearEmpty();
- binderFactory = new InMemoryStateBinderFactory<>(key);
+ binderFactory = new InMemoryStateBinderFactory();
underlying = Optional.absent();
}
@@ -246,37 +245,35 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- protected StateBinder<K> binderForNamespace(final StateNamespace namespace, StateContext<?> c) {
+ protected StateBinder binderForNamespace(final StateNamespace namespace, StateContext<?> c) {
return binderFactory.forNamespace(namespace, c);
}
- private interface StateBinderFactory<K> {
- StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c);
+ private interface StateBinderFactory {
+ StateBinder forNamespace(StateNamespace namespace, StateContext<?> c);
}
/**
* {@link StateBinderFactory} that creates a copy of any existing state when the state is bound.
*/
- private static class CopyOnBindBinderFactory<K> implements StateBinderFactory<K> {
- private final K key;
- private final Optional<StateTable<K>> underlying;
+ private static class CopyOnBindBinderFactory implements StateBinderFactory {
+ private final Optional<StateTable> underlying;
- public CopyOnBindBinderFactory(K key, Optional<StateTable<K>> underlying) {
- this.key = key;
+ public CopyOnBindBinderFactory(Optional<StateTable> underlying) {
this.underlying = underlying;
}
- private boolean containedInUnderlying(StateNamespace namespace, StateTag<? super K, ?> tag) {
+ private boolean containedInUnderlying(StateNamespace namespace, StateTag<?> tag) {
return underlying.isPresent() && underlying.get().isNamespaceInUse(namespace)
&& underlying.get().getTagsInUse(namespace).containsKey(tag);
}
@Override
- public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
- return new StateBinder<K>() {
+ public StateBinder forNamespace(final StateNamespace namespace, final StateContext<?> c) {
+ return new StateBinder() {
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
@@ -291,7 +288,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
InMemoryState<? extends ValueState<T>> existingState =
@@ -306,7 +303,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
if (containedInUnderlying(namespace, address)) {
@@ -322,7 +319,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ StateTag<BagState<T>> address, Coder<T> elemCoder) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
InMemoryState<? extends BagState<T>> existingState =
@@ -336,7 +333,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ StateTag<SetState<T>> address, Coder<T> elemCoder) {
if (containedInUnderlying(namespace, address)) {
@SuppressWarnings("unchecked")
InMemoryState<? extends SetState<T>> existingState =
@@ -350,7 +347,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> address,
+ StateTag<MapState<KeyT, ValueT>> address,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
if (containedInUnderlying(namespace, address)) {
@@ -367,7 +364,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
@@ -381,17 +378,17 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
* to {@link CopyOnAccessInMemoryStateTable#commit()} to read all values from
* the underlying table.
*/
- private static class ReadThroughBinderFactory<K> implements StateBinderFactory<K> {
- private final StateTable<K> underlying;
+ private static class ReadThroughBinderFactory<K> implements StateBinderFactory {
+ private final StateTable underlying;
- public ReadThroughBinderFactory(StateTable<K> underlying) {
+ public ReadThroughBinderFactory(StateTable underlying) {
this.underlying = underlying;
}
- public Instant readThroughAndGetEarliestHold(StateTable<K> readTo) {
+ public Instant readThroughAndGetEarliestHold(StateTable readTo) {
Instant earliestHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
for (StateNamespace namespace : underlying.getNamespacesInUse()) {
- for (Map.Entry<StateTag<? super K, ?>, ? extends State> existingState :
+ for (Map.Entry<StateTag<?>, ? extends State> existingState :
underlying.getTagsInUse(namespace).entrySet()) {
if (!((InMemoryState<?>) existingState.getValue()).isCleared()) {
// Only read through non-cleared values to ensure that completed windows are
@@ -412,44 +409,44 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
@Override
- public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
- return new StateBinder<K>() {
+ public StateBinder forNamespace(final StateNamespace namespace, final StateContext<?> c) {
+ return new StateBinder() {
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
return underlying.get(namespace, address, c);
}
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
return underlying.get(namespace, address, c);
}
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return underlying.get(namespace, address, c);
}
@Override
public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ StateTag<BagState<T>> address, Coder<T> elemCoder) {
return underlying.get(namespace, address, c);
}
@Override
public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ StateTag<SetState<T>> address, Coder<T> elemCoder) {
return underlying.get(namespace, address, c);
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> address,
+ StateTag<MapState<KeyT, ValueT>> address,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
return underlying.get(namespace, address, c);
}
@@ -457,7 +454,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return bindCombiningValue(
@@ -467,16 +464,13 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
}
}
- private static class InMemoryStateBinderFactory<K> implements StateBinderFactory<K> {
- private final K key;
+ private static class InMemoryStateBinderFactory implements StateBinderFactory {
- public InMemoryStateBinderFactory(K key) {
- this.key = key;
- }
+ public InMemoryStateBinderFactory() {}
@Override
- public StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c) {
- return new InMemoryStateBinder<>(key, c);
+ public StateBinder forNamespace(StateNamespace namespace, StateContext<?> c) {
+ return new InMemoryStateBinder(c);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index 1108f0d..107f39a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -34,11 +34,14 @@ class DirectExecutionContext
extends BaseExecutionContext<DirectStepContext> {
private final Clock clock;
private final StructuralKey<?> key;
- private final CopyOnAccessInMemoryStateInternals<Object> existingState;
+ private final CopyOnAccessInMemoryStateInternals existingState;
private final TransformWatermarks watermarks;
- public DirectExecutionContext(Clock clock, StructuralKey<?> key,
- CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) {
+ public DirectExecutionContext(
+ Clock clock,
+ StructuralKey<?> key,
+ CopyOnAccessInMemoryStateInternals existingState,
+ TransformWatermarks watermarks) {
this.clock = clock;
this.key = key;
this.existingState = existingState;
@@ -55,7 +58,7 @@ class DirectExecutionContext
*/
public class DirectStepContext
extends BaseExecutionContext.StepContext {
- private CopyOnAccessInMemoryStateInternals<Object> stateInternals;
+ private CopyOnAccessInMemoryStateInternals<?> stateInternals;
private DirectTimerInternals timerInternals;
public DirectStepContext(
@@ -64,7 +67,7 @@ class DirectExecutionContext
}
@Override
- public CopyOnAccessInMemoryStateInternals<Object> stateInternals() {
+ public CopyOnAccessInMemoryStateInternals<?> stateInternals() {
if (stateInternals == null) {
stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState);
}
@@ -84,7 +87,7 @@ class DirectExecutionContext
* Commits the state of this step, and returns the committed state. If the step has not
* accessed any state, return null.
*/
- public CopyOnAccessInMemoryStateInternals<?> commitState() {
+ public CopyOnAccessInMemoryStateInternals commitState() {
if (stateInternals != null) {
return stateInternals.commit();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 54ce027..f6d9a36 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -90,7 +90,7 @@ class EvaluationContext {
private final WatermarkCallbackExecutor callbackExecutor;
/** The stateInternals of the world, by applied PTransform and key. */
- private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>>
+ private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals>
applicationStateInternals;
private final SideInputContainer sideInputContainer;
@@ -179,9 +179,9 @@ class EvaluationContext {
result.getAggregatorChanges().commit();
}
// Update state internals
- CopyOnAccessInMemoryStateInternals<?> theirState = result.getState();
+ CopyOnAccessInMemoryStateInternals theirState = result.getState();
if (theirState != null) {
- CopyOnAccessInMemoryStateInternals<?> committedState = theirState.commit();
+ CopyOnAccessInMemoryStateInternals committedState = theirState.commit();
StepAndKey stepAndKey =
StepAndKey.of(
result.getTransform(), completedBundle == null ? null : completedBundle.getKey());
@@ -331,7 +331,7 @@ class EvaluationContext {
return new DirectExecutionContext(
clock,
key,
- (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
+ (CopyOnAccessInMemoryStateInternals) applicationStateInternals.get(stepAndKey),
watermarkManager.getWatermarks(application));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index f1e29c6..9f567a4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -164,8 +164,8 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
(PCollection<KV<K, Iterable<V>>>)
Iterables.getOnlyElement(application.getOutputs().values()));
outputBundles.add(bundle);
- CopyOnAccessInMemoryStateInternals<K> stateInternals =
- (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();
+ CopyOnAccessInMemoryStateInternals stateInternals =
+ (CopyOnAccessInMemoryStateInternals) stepContext.stateInternals();
DirectTimerInternals timerInternals = stepContext.timerInternals();
ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
new ReduceFnRunner<>(
@@ -191,7 +191,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public TransformResult<KeyedWorkItem<K, V>> finishBundle() throws Exception {
// State is initialized within the constructor. It can never be null.
- CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+ CopyOnAccessInMemoryStateInternals state = stepContext.commitState();
return StepTransformResult.<KeyedWorkItem<K, V>>withHold(
application, state.getEarliestWatermarkHold())
.withState(state)
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index cab11db..053da31 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -211,7 +211,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
throw UserCodeException.wrap(e);
}
StepTransformResult.Builder<InputT> resultBuilder;
- CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+ CopyOnAccessInMemoryStateInternals state = stepContext.commitState();
if (state != null) {
resultBuilder =
StepTransformResult.<InputT>withHold(transform, state.getEarliestWatermarkHold())
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 7efdb52..e0adc40 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -127,7 +127,7 @@ class SplittableProcessElementsEvaluatorFactory<
new StateInternalsFactory<String>() {
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public StateInternals<String> stateInternalsForKey(String key) {
+ public StateInternals stateInternalsForKey(String key) {
return (StateInternals) stepContext.stateInternals();
}
});
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 8793ae8..93ab077 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -175,10 +175,11 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
@Override
public void run() {
for (StateDeclaration stateDecl : signature.stateDeclarations().values()) {
- StateTag<Object, ?> tag;
+ StateTag<?> tag;
try {
tag =
- StateTags.tagForSpec(stateDecl.id(), (StateSpec) stateDecl.field().get(doFn));
+ StateTags.tagForSpec(
+ stateDecl.id(), (StateSpec) stateDecl.field().get(doFn));
} catch (IllegalAccessException e) {
throw new RuntimeException(
String.format(
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 01b2a72..fe3ae97 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -70,7 +70,7 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElementsBuilder;
private MetricUpdates metricUpdates;
- private CopyOnAccessInMemoryStateInternals<?> state;
+ private CopyOnAccessInMemoryStateInternals state;
private TimerUpdate timerUpdate;
private AggregatorContainer.Mutator aggregatorChanges;
private final Set<OutputType> producedOutputs;
@@ -109,7 +109,7 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
return this;
}
- public Builder<InputT> withState(CopyOnAccessInMemoryStateInternals<?> state) {
+ public Builder<InputT> withState(CopyOnAccessInMemoryStateInternals state) {
this.state = state;
return this;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index 8bb5f93..bde44ca 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -85,7 +85,7 @@ public interface TransformResult<InputT> {
* <p>If this evaluation did not access state, this may return null.
*/
@Nullable
- CopyOnAccessInMemoryStateInternals<?> getState();
+ CopyOnAccessInMemoryStateInternals getState();
/**
* Returns a TimerUpdateBuilder that was produced as a result of this evaluation. If the
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 4d04745..3e29a69 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -72,7 +72,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = internals.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -92,7 +92,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = internals.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -114,18 +114,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
*/
@Test
public void testGetWithPresentInUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, ValueState<String>> valueTag = StateTags.value("foo", StringUtf8Coder.of());
+ StateTag<ValueState<String>> valueTag = StateTags.value("foo", StringUtf8Coder.of());
ValueState<String> underlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.read(), nullValue(String.class));
underlyingValue.write("bar");
assertThat(underlyingValue.read(), equalTo("bar"));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
ValueState<String> copyOnAccessState = internals.state(namespace, valueTag);
assertThat(copyOnAccessState.read(), equalTo("bar"));
@@ -140,18 +140,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testBagStateWithUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<Integer>> valueTag = StateTags.bag("foo", VarIntCoder.of());
+ StateTag<BagState<Integer>> valueTag = StateTags.bag("foo", VarIntCoder.of());
BagState<Integer> underlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.read(), emptyIterable());
underlyingValue.add(1);
assertThat(underlyingValue.read(), containsInAnyOrder(1));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
BagState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
@@ -166,18 +166,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testSetStateWithUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, SetState<Integer>> valueTag = StateTags.set("foo", VarIntCoder.of());
+ StateTag<SetState<Integer>> valueTag = StateTags.set("foo", VarIntCoder.of());
SetState<Integer> underlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.read(), emptyIterable());
underlyingValue.add(1);
assertThat(underlyingValue.read(), containsInAnyOrder(1));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
SetState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
@@ -192,11 +192,11 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testMapStateWithUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, MapState<String, Integer>> valueTag =
+ StateTag<MapState<String, Integer>> valueTag =
StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
MapState<String, Integer> underlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.entries().read(), emptyIterable());
@@ -204,7 +204,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
underlyingValue.put("hello", 1);
assertThat(underlyingValue.get("hello").read(), equalTo(1));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
MapState<String, Integer> copyOnAccessState = internals.state(namespace, valueTag);
assertThat(copyOnAccessState.get("hello").read(), equalTo(1));
@@ -221,13 +221,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
CombineFn<Long, long[], Long> sumLongFn = Sum.ofLongs();
StateNamespace namespace = new StateNamespaceForTest("foo");
CoderRegistry reg = pipeline.getCoderRegistry();
- StateTag<Object, CombiningState<Long, long[], Long>> stateTag =
+ StateTag<CombiningState<Long, long[], Long>> stateTag =
StateTags.combiningValue("summer",
sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
@@ -236,7 +236,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
underlyingValue.add(1L);
assertThat(underlyingValue.read(), equalTo(1L));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
assertThat(copyOnAccessState.read(), equalTo(1L));
@@ -251,13 +251,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testWatermarkHoldStateWithUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST;
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, WatermarkHoldState> stateTag =
+ StateTag<WatermarkHoldState> stateTag =
StateTags.watermarkStateInternal("wmstate", timestampCombiner);
WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), nullValue());
@@ -265,7 +265,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
underlyingValue.add(new Instant(250L));
assertThat(underlyingValue.read(), equalTo(new Instant(250L)));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag);
assertThat(copyOnAccessState.read(), equalTo(new Instant(250L)));
@@ -284,10 +284,10 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithoutUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = internals.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -304,13 +304,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -331,15 +331,15 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithClearedInUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> secondUnderlying =
+ CopyOnAccessInMemoryStateInternals<String>secondUnderlying =
spy(CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, secondUnderlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -361,13 +361,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithOverwrittenUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -392,15 +392,15 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithAddedUnderlying() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
internals.commit();
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -416,7 +416,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithEmptyTableIsEmpty() {
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
internals.commit();
@@ -426,11 +426,11 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithOnlyClearedValuesIsEmpty() {
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = internals.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -444,13 +444,13 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testCommitWithEmptyNewAndFullUnderlyingIsNotEmpty() {
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
- StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
+ StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
@@ -475,16 +475,16 @@ public class CopyOnAccessInMemoryStateInternalsTest {
return new Instant(689743L);
}
};
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState> firstHoldAddress =
+ StateTag<WatermarkHoldState> firstHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState firstHold =
internals.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(22L));
- StateTag<Object, WatermarkHoldState> secondHoldAddress =
+ StateTag<WatermarkHoldState> secondHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
@@ -508,18 +508,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
return new Instant(689743L);
}
};
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState> firstHoldAddress =
+ StateTag<WatermarkHoldState> firstHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState firstHold =
underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(22L));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
- StateTag<Object, WatermarkHoldState> secondHoldAddress =
+ StateTag<WatermarkHoldState> secondHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
@@ -545,18 +545,18 @@ public class CopyOnAccessInMemoryStateInternalsTest {
return new Instant(689743L);
}
};
- CopyOnAccessInMemoryStateInternals<String> underlying =
+ CopyOnAccessInMemoryStateInternals<String>underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
- StateTag<Object, WatermarkHoldState> firstHoldAddress =
+ StateTag<WatermarkHoldState> firstHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState firstHold =
underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(224L));
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
- StateTag<Object, WatermarkHoldState> secondHoldAddress =
+ StateTag<WatermarkHoldState> secondHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
@@ -568,7 +568,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
@Test
public void testGetEarliestHoldBeforeCommit() {
- CopyOnAccessInMemoryStateInternals<String> internals =
+ CopyOnAccessInMemoryStateInternals<String>internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
internals
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 51ae12a..6f9adc4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -524,6 +524,7 @@ public class DirectRunnerTest implements Serializable {
}
private static class LongNoDecodeCoder extends CustomCoder<Long> {
+
@Override
public void encode(
Long value, OutputStream outStream, Context context) throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 0c3a8ed..bfbcd79 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -161,7 +161,7 @@ public class EvaluationContextTest {
context.getExecutionContext(createdProducer,
StructuralKey.of("foo", StringUtf8Coder.of()));
- StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+ StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
DirectStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1");
stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
@@ -194,7 +194,7 @@ public class EvaluationContextTest {
context.getExecutionContext(createdProducer,
StructuralKey.of("foo", StringUtf8Coder.of()));
- StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+ StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
fooContext
.getOrCreateStepContext("s1", "s1")
@@ -221,7 +221,7 @@ public class EvaluationContextTest {
DirectExecutionContext fooContext =
context.getExecutionContext(createdProducer, myKey);
- StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+ StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
fooContext
.getOrCreateStepContext("s1", "s1")
@@ -246,9 +246,9 @@ public class EvaluationContextTest {
DirectExecutionContext fooContext =
context.getExecutionContext(downstreamProducer, myKey);
- StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+ StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
- CopyOnAccessInMemoryStateInternals<Object> state =
+ CopyOnAccessInMemoryStateInternals<?> state =
fooContext.getOrCreateStepContext("s1", "s1").stateInternals();
BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
bag.add(1);
@@ -268,7 +268,7 @@ public class EvaluationContextTest {
DirectExecutionContext afterResultContext =
context.getExecutionContext(downstreamProducer, myKey);
- CopyOnAccessInMemoryStateInternals<Object> afterResultState =
+ CopyOnAccessInMemoryStateInternals<?> afterResultState =
afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals();
assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index ecb8130..fc63406 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -92,7 +92,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
@Mock private transient UncommittedBundle<Integer> mockUncommittedBundle;
private static final String KEY = "any-key";
- private transient StateInternals<Object> stateInternals =
+ private transient StateInternals stateInternals =
CopyOnAccessInMemoryStateInternals.<Object>withUnderlying(KEY, null);
private static final BundleFactory BUNDLE_FACTORY = ImmutableListBundleFactory.create();
@@ -104,7 +104,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
- when((StateInternals<Object>) mockStepContext.stateInternals()).thenReturn(stateInternals);
+ when((StateInternals) mockStepContext.stateInternals()).thenReturn(stateInternals);
when(mockEvaluationContext.createSideInputReader(anyList()))
.thenReturn(
SideInputContainer.create(
@@ -133,7 +133,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
ParDo.of(
new DoFn<KV<String, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<String>> spec =
+ private final StateSpec<ValueState<String>> spec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
@@ -165,7 +165,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
StateNamespaces.window(IntervalWindow.getCoder(), firstWindow);
StateNamespace secondWindowNamespace =
StateNamespaces.window(IntervalWindow.getCoder(), secondWindow);
- StateTag<Object, ValueState<String>> tag =
+ StateTag<ValueState<String>> tag =
StateTags.tagForSpec(stateId, StateSpecs.value(StringUtf8Coder.of()));
// Set up non-empty state. We don't mock + verify calls to clear() but instead
@@ -247,7 +247,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
.of(
new DoFn<KV<String, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<String>> spec =
+ private final StateSpec<ValueState<String>> spec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
index 847a00a..8640801 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
@@ -61,7 +61,7 @@ public class FlinkNoOpStepContext implements StepContext {
}
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
return null;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 879fad7..a79f856 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -123,7 +123,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
Collections.<TupleTag<?>>emptyList(),
new FlinkNoOpStepContext() {
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
return stateInternals;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 01830de..d8fd79a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -133,7 +133,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected transient long currentOutputWatermark;
- private transient StateTag<Object, BagState<WindowedValue<InputT>>> pushedBackTag;
+ private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
protected transient FlinkStateInternals<?> stateInternals;
@@ -149,7 +149,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected transient FlinkTimerInternals timerInternals;
- private transient StateInternals<?> pushbackStateInternals;
+ private transient StateInternals pushbackStateInternals;
private transient Optional<Long> pushedBackWatermark;
@@ -673,7 +673,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
}
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
return stateInternals;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index fb6762d..1887a99 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -94,10 +94,10 @@ public class SplittableDoFnOperator<
StateInternalsFactory<String> stateInternalsFactory = new StateInternalsFactory<String>() {
@Override
- public StateInternals<String> stateInternalsForKey(String key) {
+ public StateInternals stateInternalsForKey(String key) {
//this will implicitly be keyed by the key of the incoming
// element or by the key of a firing timer
- return (StateInternals<String>) stateInternals;
+ return (StateInternals) stateInternals;
}
};
TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() {
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 9718734..3899303 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -83,10 +83,10 @@ public class WindowDoFnOperator<K, InputT, OutputT>
protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() {
@Override
- public StateInternals<K> stateInternalsForKey(K key) {
+ public StateInternals stateInternalsForKey(K key) {
//this will implicitly be keyed by the key of the incoming
// element or by the key of a firing timer
- return (StateInternals<K>) stateInternals;
+ return (StateInternals) stateInternals;
}
};
TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() {
[2/6] beam git commit: Simplify type parameters of StateSpec and
related
Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 52b2f5e..26904aa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1625,7 +1625,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> intState =
+ private final StateSpec<ValueState<Integer>> intState =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -1654,7 +1654,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<Integer, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> seenSpec =
+ private final StateSpec<ValueState<Integer>> seenSpec =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -1704,7 +1704,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, MyInteger>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<MyInteger>> intState =
+ private final StateSpec<ValueState<MyInteger>> intState =
StateSpecs.value();
@ProcessElement
@@ -1734,7 +1734,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, MyInteger>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<MyInteger>> intState =
+ private final StateSpec<ValueState<MyInteger>> intState =
StateSpecs.value();
@ProcessElement
@@ -1765,7 +1765,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, MyInteger>, MyInteger>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<MyInteger>> intState =
+ private final StateSpec<ValueState<MyInteger>> intState =
StateSpecs.value();
@ProcessElement
@@ -1797,7 +1797,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, List<MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<List<MyInteger>>> intState =
+ private final StateSpec<ValueState<List<MyInteger>>> intState =
StateSpecs.value();
@ProcessElement
@@ -1828,7 +1828,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> intState =
+ private final StateSpec<ValueState<Integer>> intState =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -1876,7 +1876,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, KV<String, Integer>>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> intState =
+ private final StateSpec<ValueState<Integer>> intState =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -1892,7 +1892,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> intState =
+ private final StateSpec<ValueState<Integer>> intState =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -1929,7 +1929,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Integer>() {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> intState =
+ private final StateSpec<ValueState<Integer>> intState =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -1976,7 +1976,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, List<Integer>>() {
@StateId(stateId)
- private final StateSpec<Object, BagState<Integer>> bufferState =
+ private final StateSpec<BagState<Integer>> bufferState =
StateSpecs.bag(VarIntCoder.of());
@ProcessElement
@@ -2013,7 +2013,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, List<MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, BagState<MyInteger>> bufferState =
+ private final StateSpec<BagState<MyInteger>> bufferState =
StateSpecs.bag();
@ProcessElement
@@ -2051,7 +2051,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, List<MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, BagState<MyInteger>> bufferState =
+ private final StateSpec<BagState<MyInteger>> bufferState =
StateSpecs.bag();
@ProcessElement
@@ -2088,10 +2088,10 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Set<Integer>>() {
@StateId(stateId)
- private final StateSpec<Object, SetState<Integer>> setState =
+ private final StateSpec<SetState<Integer>> setState =
StateSpecs.set(VarIntCoder.of());
@StateId(countStateId)
- private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+ private final StateSpec<CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2132,10 +2132,10 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Set<MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
+ private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set();
@StateId(countStateId)
- private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+ private final StateSpec<CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2175,10 +2175,10 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, Set<MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set();
+ private final StateSpec<SetState<MyInteger>> setState = StateSpecs.set();
@StateId(countStateId)
- private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+ private final StateSpec<CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2217,10 +2217,10 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>>() {
@StateId(stateId)
- private final StateSpec<Object, MapState<String, Integer>> mapState =
+ private final StateSpec<MapState<String, Integer>> mapState =
StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
@StateId(countStateId)
- private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+ private final StateSpec<CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2264,9 +2264,10 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
+ private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
+
@StateId(countStateId)
- private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+ private final StateSpec<CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2310,9 +2311,10 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
@StateId(stateId)
- private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map();
+ private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
+
@StateId(countStateId)
- private final StateSpec<Object, CombiningState<Integer, int[], Integer>>
+ private final StateSpec<CombiningState<Integer, int[], Integer>>
countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
@@ -2356,16 +2358,13 @@ public class ParDoTest implements Serializable {
private static final double EPSILON = 0.0001;
@StateId(stateId)
- private final StateSpec<
- Object, CombiningState<Double, CountSum<Double>, Double>>
- combiningState =
- StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
+ private final StateSpec<CombiningState<Double, CountSum<Double>, Double>> combiningState =
+ StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of());
@ProcessElement
public void processElement(
ProcessContext c,
- @StateId(stateId)
- CombiningState<Double, CountSum<Double>, Double> state) {
+ @StateId(stateId) CombiningState<Double, CountSum<Double>, Double> state) {
state.add(c.element().getValue());
Double currentValue = state.read();
if (Math.abs(currentValue - 0.5) < EPSILON) {
@@ -2396,40 +2395,38 @@ public class ParDoTest implements Serializable {
private static final int EXPECTED_SUM = 16;
@StateId(stateId)
- private final StateSpec<
- Object, CombiningState<Integer, MyInteger, Integer>>
- combiningState =
- StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
- @Override
- public MyInteger createAccumulator() {
- return new MyInteger(0);
- }
-
- @Override
- public MyInteger addInput(MyInteger accumulator, Integer input) {
- return new MyInteger(accumulator.getValue() + input);
- }
-
- @Override
- public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
- int newValue = 0;
- for (MyInteger myInteger : accumulators) {
- newValue += myInteger.getValue();
- }
- return new MyInteger(newValue);
- }
+ private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState =
+ StateSpecs.combining(
+ new Combine.CombineFn<Integer, MyInteger, Integer>() {
+ @Override
+ public MyInteger createAccumulator() {
+ return new MyInteger(0);
+ }
+
+ @Override
+ public MyInteger addInput(MyInteger accumulator, Integer input) {
+ return new MyInteger(accumulator.getValue() + input);
+ }
+
+ @Override
+ public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
+ int newValue = 0;
+ for (MyInteger myInteger : accumulators) {
+ newValue += myInteger.getValue();
+ }
+ return new MyInteger(newValue);
+ }
- @Override
- public Integer extractOutput(MyInteger accumulator) {
- return accumulator.getValue();
- }
- });
+ @Override
+ public Integer extractOutput(MyInteger accumulator) {
+ return accumulator.getValue();
+ }
+ });
@ProcessElement
public void processElement(
ProcessContext c,
- @StateId(stateId)
- CombiningState<Integer, MyInteger, Integer> state) {
+ @StateId(stateId) CombiningState<Integer, MyInteger, Integer> state) {
state.add(c.element().getValue());
Integer currentValue = state.read();
if (currentValue == EXPECTED_SUM) {
@@ -2458,40 +2455,38 @@ public class ParDoTest implements Serializable {
private static final int EXPECTED_SUM = 16;
@StateId(stateId)
- private final StateSpec<
- Object, CombiningState<Integer, MyInteger, Integer>>
- combiningState =
- StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() {
- @Override
- public MyInteger createAccumulator() {
- return new MyInteger(0);
- }
-
- @Override
- public MyInteger addInput(MyInteger accumulator, Integer input) {
- return new MyInteger(accumulator.getValue() + input);
- }
-
- @Override
- public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
- int newValue = 0;
- for (MyInteger myInteger : accumulators) {
- newValue += myInteger.getValue();
- }
- return new MyInteger(newValue);
- }
+ private final StateSpec<CombiningState<Integer, MyInteger, Integer>> combiningState =
+ StateSpecs.combining(
+ new Combine.CombineFn<Integer, MyInteger, Integer>() {
+ @Override
+ public MyInteger createAccumulator() {
+ return new MyInteger(0);
+ }
+
+ @Override
+ public MyInteger addInput(MyInteger accumulator, Integer input) {
+ return new MyInteger(accumulator.getValue() + input);
+ }
+
+ @Override
+ public MyInteger mergeAccumulators(Iterable<MyInteger> accumulators) {
+ int newValue = 0;
+ for (MyInteger myInteger : accumulators) {
+ newValue += myInteger.getValue();
+ }
+ return new MyInteger(newValue);
+ }
- @Override
- public Integer extractOutput(MyInteger accumulator) {
- return accumulator.getValue();
- }
- });
+ @Override
+ public Integer extractOutput(MyInteger accumulator) {
+ return accumulator.getValue();
+ }
+ });
@ProcessElement
public void processElement(
ProcessContext c,
- @StateId(stateId)
- CombiningState<Integer, MyInteger, Integer> state) {
+ @StateId(stateId) CombiningState<Integer, MyInteger, Integer> state) {
state.add(c.element().getValue());
Integer currentValue = state.read();
if (currentValue == EXPECTED_SUM) {
@@ -2523,7 +2518,7 @@ public class ParDoTest implements Serializable {
new DoFn<KV<String, Integer>, List<Integer>>() {
@StateId(stateId)
- private final StateSpec<Object, BagState<Integer>> bufferState =
+ private final StateSpec<BagState<Integer>> bufferState =
StateSpecs.bag(VarIntCoder.of());
@ProcessElement
@@ -2697,7 +2692,7 @@ public class ParDoTest implements Serializable {
private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId(stateId)
- private final StateSpec<Object, ValueState<String>> stateSpec =
+ private final StateSpec<ValueState<String>> stateSpec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 5732438..c16eea2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -188,7 +188,7 @@ public class DoFnInvokersTest {
class MockFn extends DoFn<String, String> {
@StateId(stateId)
- private final StateSpec<Object, ValueState<Integer>> spec =
+ private final StateSpec<ValueState<Integer>> spec =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index e1fa2d1..d6cc4f6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -542,11 +542,11 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("my-id")
- private final StateSpec<Object, ValueState<Integer>> myfield1 =
+ private final StateSpec<ValueState<Integer>> myfield1 =
StateSpecs.value(VarIntCoder.of());
@StateId("my-id")
- private final StateSpec<Object, ValueState<Long>> myfield2 =
+ private final StateSpec<ValueState<Long>> myfield2 =
StateSpecs.value(VarLongCoder.of());
@ProcessElement
@@ -565,7 +565,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("my-id")
- private StateSpec<Object, ValueState<Integer>> myfield =
+ private StateSpec<ValueState<Integer>> myfield =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -618,7 +618,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("my-id")
- private final StateSpec<Object, ValueState<Integer>> myfield =
+ private final StateSpec<ValueState<Integer>> myfield =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -644,7 +644,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("my-id")
- private final StateSpec<Object, ValueState<Integer>> myfield =
+ private final StateSpec<ValueState<Integer>> myfield =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -668,7 +668,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("my-id")
- private final StateSpec<Object, ValueState<Integer>> myfield =
+ private final StateSpec<ValueState<Integer>> myfield =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -683,7 +683,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("foo")
- private final StateSpec<Object, ValueState<Integer>> bizzle =
+ private final StateSpec<ValueState<Integer>> bizzle =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -728,7 +728,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFnUsingState() {
@StateId(DoFnUsingState.STATE_ID)
- private final StateSpec<Object, ValueState<Integer>> spec =
+ private final StateSpec<ValueState<Integer>> spec =
StateSpecs.value(VarIntCoder.of());
}.getClass());
}
@@ -770,7 +770,7 @@ public class DoFnSignaturesTest {
DoFnSignatures.getSignature(
new DoFn<KV<String, Integer>, Long>() {
@StateId("foo")
- private final StateSpec<Object, ValueState<Integer>> bizzleDecl =
+ private final StateSpec<ValueState<Integer>> bizzleDecl =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -803,7 +803,7 @@ public class DoFnSignaturesTest {
public void testSimpleStateIdNamedDoFn() throws Exception {
class DoFnForTestSimpleStateIdNamedDoFn extends DoFn<KV<String, Integer>, Long> {
@StateId("foo")
- private final StateSpec<Object, ValueState<Integer>> bizzle =
+ private final StateSpec<ValueState<Integer>> bizzle =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
@@ -831,7 +831,7 @@ public class DoFnSignaturesTest {
// Note that in order to have a coder for T it will require initialization in the constructor,
// but that isn't important for this test
@StateId("foo")
- private final StateSpec<Object, ValueState<T>> bizzle = null;
+ private final StateSpec<ValueState<T>> bizzle = null;
@ProcessElement
public void foo(ProcessContext context) {}
@@ -866,7 +866,7 @@ public class DoFnSignaturesTest {
public static final String STATE_ID = "my-state-id";
@StateId(STATE_ID)
- private final StateSpec<Object, ValueState<Integer>> bizzle =
+ private final StateSpec<ValueState<Integer>> bizzle =
StateSpecs.value(VarIntCoder.of());
}
@@ -882,7 +882,7 @@ public class DoFnSignaturesTest {
public static final String STATE_ID = "my-state-id";
@StateId(STATE_ID)
- private final StateSpec<Object, ValueState<String>> myStateSpec =
+ private final StateSpec<ValueState<String>> myStateSpec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
index 9714d72..9b79d11 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -59,7 +59,7 @@ public class FakeStepContext implements StepContext {
}
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
throw new UnsupportedOperationException();
}