You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/02 18:56:21 UTC
[3/6] beam git commit: Simplify type parameters of StateSpec and
related
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index 31e931c..cfe3f9b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineContextFactory;
import org.apache.beam.sdk.util.state.BagState;
@@ -62,7 +61,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
* <p>Note: Ignore index of key.
* Mainly for SideInputs.
*/
-public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
+public class FlinkBroadcastStateInternals<K> implements StateInternals {
private int indexInSubtaskGroup;
private final DefaultOperatorStateBackend stateBackend;
@@ -86,7 +85,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address) {
+ StateTag<T> address) {
return state(namespace, address, StateContexts.nullContext());
}
@@ -94,36 +93,36 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address,
+ StateTag<T> address,
final StateContext<?> context) {
return address.bind(
- new StateTag.StateBinder<K>() {
+ new StateTag.StateBinder() {
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
return new FlinkBroadcastValueState<>(stateBackend, address, namespace, coder);
}
@Override
public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ StateTag<BagState<T>> address, Coder<T> elemCoder) {
return new FlinkBroadcastBagState<>(stateBackend, address, namespace, elemCoder);
}
@Override
public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ StateTag<SetState<T>> address, Coder<T> elemCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", SetState.class.getSimpleName()));
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
throw new UnsupportedOperationException(
@@ -133,7 +132,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
@@ -144,7 +143,7 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return new FlinkCombiningStateWithContext<>(
@@ -158,8 +157,8 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
throw new UnsupportedOperationException(
String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
@@ -302,11 +301,11 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
extends AbstractBroadcastState<T> implements ValueState<T> {
private final StateNamespace namespace;
- private final StateTag<? super K, ValueState<T>> address;
+ private final StateTag<ValueState<T>> address;
FlinkBroadcastValueState(
DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, ValueState<T>> address,
+ StateTag<ValueState<T>> address,
StateNamespace namespace,
Coder<T> coder) {
super(flinkStateBackend, address.getId(), namespace, coder);
@@ -363,11 +362,11 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
implements BagState<T> {
private final StateNamespace namespace;
- private final StateTag<? super K, BagState<T>> address;
+ private final StateTag<BagState<T>> address;
FlinkBroadcastBagState(
DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, BagState<T>> address,
+ StateTag<BagState<T>> address,
StateNamespace namespace,
Coder<T> coder) {
super(flinkStateBackend, address.getId(), namespace, ListCoder.of(coder));
@@ -451,12 +450,12 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
FlinkCombiningState(
DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder) {
@@ -568,13 +567,13 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
private final FlinkBroadcastStateInternals<K> flinkStateInternals;
FlinkKeyedCombiningState(
DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
@@ -704,14 +703,14 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
private final FlinkBroadcastStateInternals<K> flinkStateInternals;
private final CombineWithContext.Context context;
FlinkCombiningStateWithContext(
DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index 67d7966..c9b7797 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.state.BagState;
@@ -67,7 +66,7 @@ import org.apache.flink.util.Preconditions;
*
* <p>Reference from {@link HeapInternalTimerService} to the local key-group range.
*/
-public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
+public class FlinkKeyGroupStateInternals<K> implements StateInternals {
private final Coder<K> keyCoder;
private final KeyGroupsList localKeyGroupRange;
@@ -109,7 +108,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address) {
+ StateTag<T> address) {
return state(namespace, address, StateContexts.nullContext());
}
@@ -117,36 +116,36 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address,
+ StateTag<T> address,
final StateContext<?> context) {
return address.bind(
- new StateTag.StateBinder<K>() {
+ new StateTag.StateBinder() {
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", ValueState.class.getSimpleName()));
}
@Override
public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ StateTag<BagState<T>> address, Coder<T> elemCoder) {
return new FlinkKeyGroupBagState<>(address, namespace, elemCoder);
}
@Override
public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ StateTag<SetState<T>> address, Coder<T> elemCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", SetState.class.getSimpleName()));
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
throw new UnsupportedOperationException(
@@ -156,7 +155,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
throw new UnsupportedOperationException("bindCombiningValue is not supported.");
@@ -165,7 +164,7 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
throw new UnsupportedOperationException(
@@ -173,8 +172,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
throw new UnsupportedOperationException(
String.format("%s is not supported", CombiningState.class.getSimpleName()));
@@ -334,10 +333,10 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
implements BagState<T> {
private final StateNamespace namespace;
- private final StateTag<? super K, BagState<T>> address;
+ private final StateTag<BagState<T>> address;
FlinkKeyGroupBagState(
- StateTag<? super K, BagState<T>> address,
+ StateTag<BagState<T>> address,
StateNamespace namespace,
Coder<T> coder) {
super(address.getId(), namespace.stringKey(), ListCoder.of(coder),
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index ef6c3b2..3d38f88 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -26,7 +26,6 @@ import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -53,7 +52,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
* Ignore index of key and namespace.
* Just implement BagState.
*/
-public class FlinkSplitStateInternals<K> implements StateInternals<K> {
+public class FlinkSplitStateInternals<K> implements StateInternals {
private final OperatorStateBackend stateBackend;
@@ -69,7 +68,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address) {
+ StateTag<T> address) {
return state(namespace, address, StateContexts.nullContext());
}
@@ -77,36 +76,36 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address,
+ StateTag<T> address,
final StateContext<?> context) {
return address.bind(
- new StateTag.StateBinder<K>() {
+ new StateTag.StateBinder() {
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", ValueState.class.getSimpleName()));
}
@Override
public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ StateTag<BagState<T>> address, Coder<T> elemCoder) {
return new FlinkSplitBagState<>(stateBackend, address, namespace, elemCoder);
}
@Override
public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ StateTag<SetState<T>> address, Coder<T> elemCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", SetState.class.getSimpleName()));
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
throw new UnsupportedOperationException(
@@ -116,7 +115,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
throw new UnsupportedOperationException("bindCombiningValue is not supported.");
@@ -125,7 +124,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
throw new UnsupportedOperationException(
@@ -133,8 +132,8 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
throw new UnsupportedOperationException(
String.format("%s is not supported", CombiningState.class.getSimpleName()));
@@ -147,11 +146,11 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> {
private final ListStateDescriptor<T> descriptor;
private OperatorStateBackend flinkStateBackend;
private final StateNamespace namespace;
- private final StateTag<? super K, BagState<T>> address;
+ private final StateTag<BagState<T>> address;
FlinkSplitBagState(
OperatorStateBackend flinkStateBackend,
- StateTag<? super K, BagState<T>> address,
+ StateTag<BagState<T>> address,
StateNamespace namespace,
Coder<T> coder) {
this.flinkStateBackend = flinkStateBackend;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index c99d085..c033be6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -57,7 +57,7 @@ import org.joda.time.Instant;
* <p>Note: In the Flink streaming runner the key is always encoded
* using an {@link Coder} and stored in a {@link ByteBuffer}.
*/
-public class FlinkStateInternals<K> implements StateInternals<K> {
+public class FlinkStateInternals<K> implements StateInternals {
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
private Coder<K> keyCoder;
@@ -95,7 +95,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address) {
+ StateTag<T> address) {
return state(namespace, address, StateContexts.nullContext());
}
@@ -103,36 +103,36 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public <T extends State> T state(
final StateNamespace namespace,
- StateTag<? super K, T> address,
+ StateTag<T> address,
final StateContext<?> context) {
return address.bind(
- new StateTag.StateBinder<K>() {
+ new StateTag.StateBinder() {
@Override
public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ StateTag<ValueState<T>> address, Coder<T> coder) {
return new FlinkValueState<>(flinkStateBackend, address, namespace, coder);
}
@Override
public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ StateTag<BagState<T>> address, Coder<T> elemCoder) {
return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder);
}
@Override
public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address, Coder<T> elemCoder) {
+ StateTag<SetState<T>> address, Coder<T> elemCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", SetState.class.getSimpleName()));
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
throw new UnsupportedOperationException(
@@ -142,7 +142,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
@@ -153,7 +153,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return new FlinkCombiningStateWithContext<>(
@@ -167,8 +167,8 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
return new FlinkWatermarkHoldState<>(
@@ -180,13 +180,13 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
private static class FlinkValueState<K, T> implements ValueState<T> {
private final StateNamespace namespace;
- private final StateTag<? super K, ValueState<T>> address;
+ private final StateTag<ValueState<T>> address;
private final ValueStateDescriptor<T> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
FlinkValueState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, ValueState<T>> address,
+ StateTag<ValueState<T>> address,
StateNamespace namespace,
Coder<T> coder) {
@@ -266,13 +266,13 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
private static class FlinkBagState<K, T> implements BagState<T> {
private final StateNamespace namespace;
- private final StateTag<? super K, BagState<T>> address;
+ private final StateTag<BagState<T>> address;
private final ListStateDescriptor<T> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
FlinkBagState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, BagState<T>> address,
+ StateTag<BagState<T>> address,
StateNamespace namespace,
Coder<T> coder) {
@@ -379,14 +379,14 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
FlinkCombiningState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder) {
@@ -547,7 +547,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
@@ -555,7 +555,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
FlinkKeyedCombiningState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
@@ -718,7 +718,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
+ private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
@@ -727,7 +727,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
FlinkCombiningStateWithContext(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
@@ -886,7 +886,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
implements WatermarkHoldState {
- private final StateTag<? super K, WatermarkHoldState> address;
+ private final StateTag<WatermarkHoldState> address;
private final TimestampCombiner timestampCombiner;
private final StateNamespace namespace;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
@@ -896,7 +896,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
public FlinkWatermarkHoldState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
FlinkStateInternals<K> flinkStateInternals,
- StateTag<? super K, WatermarkHoldState> address,
+ StateTag<WatermarkHoldState> address,
StateNamespace namespace,
TimestampCombiner timestampCombiner) {
this.address = address;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 4e18ac2..bda30e4 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -196,7 +196,7 @@ public class DoFnOperatorTest {
DoFn<Integer, String> fn = new DoFn<Integer, String>() {
@StateId("state")
- private final StateSpec<Object, ValueState<String>> stateSpec =
+ private final StateSpec<ValueState<String>> stateSpec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
@@ -296,7 +296,7 @@ public class DoFnOperatorTest {
private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId(stateId)
- private final StateSpec<Object, ValueState<String>> stateSpec =
+ private final StateSpec<ValueState<String>> stateSpec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index 7e7d1e1..eb2c05f 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -56,12 +56,12 @@ public class FlinkBroadcastStateInternalsTest {
private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
- private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+ private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+ private static final StateTag<CombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ private static final StateTag<BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
FlinkBroadcastStateInternals<String> underTest;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
index 5433d07..0e0267b 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
@@ -64,7 +64,7 @@ public class FlinkKeyGroupStateInternalsTest {
private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ private static final StateTag<BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
FlinkKeyGroupStateInternals<String> underTest;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
index 08ae0c4..8033a9d 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
@@ -47,7 +47,7 @@ public class FlinkSplitStateInternalsTest {
private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ private static final StateTag<BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
FlinkSplitStateInternals<String> underTest;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 17c43bf..cd00d9e 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -70,18 +70,18 @@ public class FlinkStateInternalsTest {
private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
- private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+ private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+ private static final StateTag<CombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ private static final StateTag<BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+ private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
FlinkStateInternals<String> underTest;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
index ce7f678..38129ab 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -103,7 +103,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
private static class DummyStatefulDoFn extends DoFn<KV<Integer, Integer>, Integer> {
@StateId("foo")
- private final StateSpec<Object, ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of());
+ private final StateSpec<ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void processElem(ProcessContext c) {
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 343d51b..63e1166 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -890,7 +890,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
ParDo.of(
new DoFn<KV<Integer, Integer>, Integer>() {
@StateId("unused")
- final StateSpec<Object, ValueState<Integer>> stateSpec =
+ final StateSpec<ValueState<Integer>> stateSpec =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index cdc23ff..afaba3a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
@@ -51,7 +50,7 @@ import org.joda.time.Instant;
/**
* An implementation of {@link StateInternals} for the SparkRunner.
*/
-class SparkStateInternals<K> implements StateInternals<K> {
+class SparkStateInternals<K> implements StateInternals {
private final K key;
//Serializable state for internals (namespace to state tag to coded value).
@@ -86,50 +85,47 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
@Override
- public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+ public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
return state(namespace, address, StateContexts.nullContext());
}
@Override
public <T extends State> T state(
StateNamespace namespace,
- StateTag<? super K, T> address,
+ StateTag<T> address,
StateContext<?> c) {
- return address.bind(new SparkStateBinder(key, namespace, c));
+ return address.bind(new SparkStateBinder(namespace, c));
}
- private class SparkStateBinder implements StateBinder<K> {
- private final K key;
+ private class SparkStateBinder implements StateBinder {
private final StateNamespace namespace;
private final StateContext<?> c;
- private SparkStateBinder(K key,
- StateNamespace namespace,
+ private SparkStateBinder(StateNamespace namespace,
StateContext<?> c) {
- this.key = key;
this.namespace = namespace;
this.c = c;
}
@Override
- public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
return new SparkValueState<>(namespace, address, coder);
}
@Override
- public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
return new SparkBagState<>(namespace, address, elemCoder);
}
@Override
- public <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> elemCoder) {
+ public <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", SetState.class.getSimpleName()));
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
throw new UnsupportedOperationException(
String.format("%s is not supported", MapState.class.getSimpleName()));
@@ -138,7 +134,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
return new SparkCombiningState<>(namespace, address, accumCoder, combineFn);
@@ -147,7 +143,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return new SparkCombiningState<>(
@@ -155,8 +151,8 @@ class SparkStateInternals<K> implements StateInternals<K> {
}
@Override
- public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
- StateTag<? super K, WatermarkHoldState> address,
+ public WatermarkHoldState bindWatermark(
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
return new SparkWatermarkHoldState(namespace, address, timestampCombiner);
}
@@ -164,12 +160,12 @@ class SparkStateInternals<K> implements StateInternals<K> {
private class AbstractState<T> {
final StateNamespace namespace;
- final StateTag<?, ? extends State> address;
+ final StateTag<? extends State> address;
final Coder<T> coder;
private AbstractState(
StateNamespace namespace,
- StateTag<?, ? extends State> address,
+ StateTag<? extends State> address,
Coder<T> coder) {
this.namespace = namespace;
this.address = address;
@@ -218,7 +214,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
private SparkValueState(
StateNamespace namespace,
- StateTag<?, ValueState<T>> address,
+ StateTag<ValueState<T>> address,
Coder<T> coder) {
super(namespace, address, coder);
}
@@ -246,7 +242,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
public SparkWatermarkHoldState(
StateNamespace namespace,
- StateTag<?, WatermarkHoldState> address,
+ StateTag<WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
super(namespace, address, InstantCoder.of());
this.timestampCombiner = timestampCombiner;
@@ -300,7 +296,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
private SparkCombiningState(
StateNamespace namespace,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+ StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> coder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
super(namespace, address, coder);
@@ -363,7 +359,7 @@ class SparkStateInternals<K> implements StateInternals<K> {
private final class SparkBagState<T> extends AbstractState<List<T>> implements BagState<T> {
private SparkBagState(
StateNamespace namespace,
- StateTag<?, BagState<T>> address,
+ StateTag<BagState<T>> address,
Coder<T> coder) {
super(namespace, address, ListCoder.of(coder));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index 0a00c45..063feef 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -93,7 +93,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
timerInternals.advanceProcessingTime(Instant.now());
timerInternals.advanceSynchronizedProcessingTime(Instant.now());
- StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+ StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
GABWOutputWindowedValue<K, InputT> outputter = new GABWOutputWindowedValue<>();
ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner =
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 3e8dde5..ffe343b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -124,7 +124,7 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
Coder<W> windowCoder) throws IOException { }
@Override
- public StateInternals<?> stateInternals() {
+ public StateInternals stateInternals() {
return null;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index ef1ff9f..7b6f9ed 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -65,7 +65,7 @@ public final class TranslationUtils {
*/
static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>, Serializable {
@Override
- public StateInternals<K> stateInternalsForKey(K key) {
+ public StateInternals stateInternalsForKey(K key) {
return InMemoryStateInternals.forKey(key);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 9b99ca4..0368476 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -331,10 +331,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
/**
* Annotation for declaring and dereferencing state cells.
*
- * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a {@link
- * StateId}. To use the cell during processing, add a parameter of the appropriate {@link State}
- * subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} method, and
- * annotate it with {@link StateId}. See the following code for an example:
+ * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a
+ * {@link StateId}. To use the cell during processing, add a parameter of the appropriate {@link
+ * State} subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer}
+ * method, and annotate it with {@link StateId}. See the following code for an example:
*
* <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} {
*
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index b5547e3..02f3a85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -112,14 +112,14 @@ public class GroupIntoBatches<K, InputT>
private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId(BATCH_ID)
- private final StateSpec<Object, BagState<InputT>> batchSpec;
+ private final StateSpec<BagState<InputT>> batchSpec;
@StateId(NUM_ELEMENTS_IN_BATCH_ID)
- private final StateSpec<Object, CombiningState<Long, Long, Long>>
+ private final StateSpec<CombiningState<Long, Long, Long>>
numElementsInBatchSpec;
@StateId(KEY_ID)
- private final StateSpec<Object, ValueState<K>> keySpec;
+ private final StateSpec<ValueState<K>> keySpec;
private final long prefetchFrequency;
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 1f6afbf..6137a7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -446,7 +446,7 @@ public class ParDo {
Map<String, DoFnSignature.StateDeclaration> stateDeclarations = signature.stateDeclarations();
for (DoFnSignature.StateDeclaration stateDeclaration : stateDeclarations.values()) {
try {
- StateSpec<?, ?> stateSpec = (StateSpec<?, ?>) stateDeclaration.field().get(fn);
+ StateSpec<?> stateSpec = (StateSpec<?>) stateDeclaration.field().get(fn);
stateSpec.offerCoders(codersForStateSpecTypes(stateDeclaration, coderRegistry, inputCoder));
stateSpec.finishSpecifying();
} catch (IllegalAccessException e) {
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index 6fe37a1..48fa742 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -20,34 +20,36 @@ package org.apache.beam.sdk.util.state;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
/**
* Visitor for binding a {@link StateSpec} and to the associated {@link State}.
- *
- * @param <K> the type of key this binder embodies.
*/
-public interface StateBinder<K> {
- <T> ValueState<T> bindValue(String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder);
+public interface StateBinder {
+ <T> ValueState<T> bindValue(
+ String id, StateSpec<ValueState<T>> spec, Coder<T> coder);
- <T> BagState<T> bindBag(String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder);
+ <T> BagState<T> bindBag(
+ String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder);
- <T> SetState<T> bindSet(String id, StateSpec<? super K, SetState<T>> spec, Coder<T> elemCoder);
+ <T> SetState<T> bindSet(
+ String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder);
<KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec,
- Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder);
+ String id,
+ StateSpec<MapState<KeyT, ValueT>> spec,
+ Coder<KeyT> mapKeyCoder,
+ Coder<ValueT> mapValueCoder);
<InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(
String id,
- StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
<InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
String id,
- StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
@@ -57,8 +59,8 @@ public interface StateBinder<K> {
* <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added
* to the returned {@link WatermarkHoldState} are to be combined.
*/
- <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+ WatermarkHoldState bindWatermark(
String id,
- StateSpec<? super K, WatermarkHoldState> spec,
+ StateSpec<WatermarkHoldState> spec,
TimestampCombiner timestampCombiner);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
index 6b94c40..8eda218 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java
@@ -26,23 +26,22 @@ import org.apache.beam.sdk.coders.Coder;
* A specification of a persistent state cell. This includes information necessary to encode the
* value and details about the intended access pattern.
*
- * @param <K> The type of key that must be used with the state tag. Contravariant: methods should
- * accept values of type {@code StateSpec<? super K, StateT>}.
* @param <StateT> The type of state being described.
*/
@Experimental(Kind.STATE)
-public interface StateSpec<K, StateT extends State> extends Serializable {
+public interface StateSpec<StateT extends State> extends Serializable {
/**
* Use the {@code binder} to create an instance of {@code StateT} appropriate for this address.
*/
- StateT bind(String id, StateBinder<? extends K> binder);
+ StateT bind(String id, StateBinder binder);
/**
- * Given {code coders} are inferred from type arguments defined for this class.
- * Coders which are already set should take precedence over offered coders.
- * @param coders Array of coders indexed by the type arguments order. Entries might be null if
- * the coder could not be inferred.
+ * Given {code coders} are inferred from type arguments defined for this class. Coders which are
+ * already set should take precedence over offered coders.
+ *
+ * @param coders Array of coders indexed by the type arguments order. Entries might be null if the
+ * coder could not be inferred.
*/
void offerCoders(Coder[] coders);
http://git-wip-us.apache.org/repos/asf/beam/blob/8fe59c35/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index a057a0b..49d5722 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
/**
@@ -42,12 +41,12 @@ public class StateSpecs {
private StateSpecs() {}
/** Create a simple state spec for values of type {@code T}. */
- public static <T> StateSpec<Object, ValueState<T>> value() {
+ public static <T> StateSpec<ValueState<T>> value() {
return new ValueStateSpec<>(null);
}
/** Create a simple state spec for values of type {@code T}. */
- public static <T> StateSpec<Object, ValueState<T>> value(Coder<T> valueCoder) {
+ public static <T> StateSpec<ValueState<T>> value(Coder<T> valueCoder) {
checkArgument(valueCoder != null, "valueCoder should not be null. Consider value() instead");
return new ValueStateSpec<>(valueCoder);
}
@@ -57,17 +56,27 @@ public class StateSpecs {
* {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
CombineFn<InputT, AccumT, OutputT> combineFn) {
return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn);
}
/**
+ * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge
+ * multiple {@code InputT}s into a single {@code OutputT}.
+ */
+ public static <InputT, AccumT, OutputT>
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(null, combineFn);
+ }
+
+ /**
* Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
* {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
checkArgument(accumCoder != null,
"accumCoder should not be null. "
@@ -80,11 +89,8 @@ public class StateSpecs {
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
- checkArgument(accumCoder != null,
- "accumCoder should not be null. "
- + "Consider using combining(CombineFn<> combineFn) instead.");
return combiningInternal(accumCoder, combineFn);
}
@@ -96,7 +102,7 @@ public class StateSpecs {
* only be used to initialize static values.
*/
public static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>>
+ StateSpec<CombiningState<InputT, AccumT, OutputT>>
combiningFromInputInternal(
Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
try {
@@ -113,13 +119,13 @@ public class StateSpecs {
}
private static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
}
private static <InputT, AccumT, OutputT>
- StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
}
@@ -128,7 +134,7 @@ public class StateSpecs {
* Create a state spec that is optimized for adding values frequently, and occasionally retrieving
* all the values that have been added.
*/
- public static <T> StateSpec<Object, BagState<T>> bag() {
+ public static <T> StateSpec<BagState<T>> bag() {
return bag(null);
}
@@ -136,49 +142,46 @@ public class StateSpecs {
* Create a state spec that is optimized for adding values frequently, and occasionally retrieving
* all the values that have been added.
*/
- public static <T> StateSpec<Object, BagState<T>> bag(Coder<T> elemCoder) {
+ public static <T> StateSpec<BagState<T>> bag(Coder<T> elemCoder) {
return new BagStateSpec<>(elemCoder);
}
/**
* Create a state spec that supporting for {@link java.util.Set} like access patterns.
*/
- public static <T> StateSpec<Object, SetState<T>> set() {
+ public static <T> StateSpec<SetState<T>> set() {
return set(null);
}
/**
* Create a state spec that supporting for {@link java.util.Set} like access patterns.
*/
- public static <T> StateSpec<Object, SetState<T>> set(Coder<T> elemCoder) {
+ public static <T> StateSpec<SetState<T>> set(Coder<T> elemCoder) {
return new SetStateSpec<>(elemCoder);
}
/**
* Create a state spec that supporting for {@link java.util.Map} like access patterns.
*/
- public static <K, V> StateSpec<Object, MapState<K, V>> map() {
+ public static <K, V> StateSpec<MapState<K, V>> map() {
return new MapStateSpec<>(null, null);
}
- /**
- * Create a state spec that supporting for {@link java.util.Map} like access patterns.
- */
- public static <K, V> StateSpec<Object, MapState<K, V>> map(Coder<K> keyCoder,
- Coder<V> valueCoder) {
+ /** Create a state spec that supporting for {@link java.util.Map} like access patterns. */
+ public static <K, V> StateSpec<MapState<K, V>> map(Coder<K> keyCoder, Coder<V> valueCoder) {
return new MapStateSpec<>(keyCoder, valueCoder);
}
/** Create a state spec for holding the watermark. */
- public static <W extends BoundedWindow>
- StateSpec<Object, WatermarkHoldState> watermarkStateInternal(
+ public static
+ StateSpec<WatermarkHoldState> watermarkStateInternal(
TimestampCombiner timestampCombiner) {
- return new WatermarkStateSpecInternal<W>(timestampCombiner);
+ return new WatermarkStateSpecInternal(timestampCombiner);
}
- public static <K, InputT, AccumT, OutputT>
- StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
- StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
+ public static <InputT, AccumT, OutputT>
+ StateSpec<BagState<AccumT>> convertToBagSpecInternal(
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
if (combiningSpec instanceof CombiningStateSpec) {
// Checked above; conversion to a bag spec depends on the provided spec being one of those
// created via the factory methods in this class.
@@ -201,7 +204,7 @@ public class StateSpecs {
*
* <p>Includes the coder for {@code T}.
*/
- private static class ValueStateSpec<T> implements StateSpec<Object, ValueState<T>> {
+ private static class ValueStateSpec<T> implements StateSpec<ValueState<T>> {
@Nullable
private Coder<T> coder;
@@ -211,7 +214,7 @@ public class StateSpecs {
}
@Override
- public ValueState<T> bind(String id, StateBinder<?> visitor) {
+ public ValueState<T> bind(String id, StateBinder visitor) {
return visitor.bindValue(id, this, coder);
}
@@ -260,7 +263,7 @@ public class StateSpecs {
* <p>Includes the {@link CombineFn} and the coder for the accumulator type.
*/
private static class CombiningStateSpec<InputT, AccumT, OutputT>
- implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
+ implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
@Nullable
private Coder<AccumT> accumCoder;
@@ -275,7 +278,7 @@ public class StateSpecs {
@Override
public CombiningState<InputT, AccumT, OutputT> bind(
- String id, StateBinder<? extends Object> visitor) {
+ String id, StateBinder visitor) {
return visitor.bindCombining(id, this, accumCoder, combineFn);
}
@@ -320,7 +323,7 @@ public class StateSpecs {
return Objects.hash(getClass(), accumCoder);
}
- private StateSpec<Object, BagState<AccumT>> asBagSpec() {
+ private StateSpec<BagState<AccumT>> asBagSpec() {
return new BagStateSpec<AccumT>(accumCoder);
}
}
@@ -332,7 +335,7 @@ public class StateSpecs {
* <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type.
*/
private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT>
- implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
+ implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
@Nullable private Coder<AccumT> accumCoder;
private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
@@ -346,7 +349,7 @@ public class StateSpecs {
@Override
public CombiningState<InputT, AccumT, OutputT> bind(
- String id, StateBinder<? extends Object> visitor) {
+ String id, StateBinder visitor) {
return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn);
}
@@ -392,7 +395,7 @@ public class StateSpecs {
return Objects.hash(getClass(), accumCoder);
}
- private StateSpec<Object, BagState<AccumT>> asBagSpec() {
+ private StateSpec<BagState<AccumT>> asBagSpec() {
return new BagStateSpec<AccumT>(accumCoder);
}
}
@@ -403,7 +406,7 @@ public class StateSpecs {
*
* <p>Includes the coder for the element type {@code T}</p>
*/
- private static class BagStateSpec<T> implements StateSpec<Object, BagState<T>> {
+ private static class BagStateSpec<T> implements StateSpec<BagState<T>> {
@Nullable
private Coder<T> elemCoder;
@@ -413,7 +416,7 @@ public class StateSpecs {
}
@Override
- public BagState<T> bind(String id, StateBinder<?> visitor) {
+ public BagState<T> bind(String id, StateBinder visitor) {
return visitor.bindBag(id, this, elemCoder);
}
@@ -456,7 +459,7 @@ public class StateSpecs {
}
}
- private static class MapStateSpec<K, V> implements StateSpec<Object, MapState<K, V>> {
+ private static class MapStateSpec<K, V> implements StateSpec<MapState<K, V>> {
@Nullable
private Coder<K> keyCoder;
@@ -469,7 +472,7 @@ public class StateSpecs {
}
@Override
- public MapState<K, V> bind(String id, StateBinder<?> visitor) {
+ public MapState<K, V> bind(String id, StateBinder visitor) {
return visitor.bindMap(id, this, keyCoder, valueCoder);
}
@@ -523,7 +526,7 @@ public class StateSpecs {
*
* <p>Includes the coder for the element type {@code T}</p>
*/
- private static class SetStateSpec<T> implements StateSpec<Object, SetState<T>> {
+ private static class SetStateSpec<T> implements StateSpec<SetState<T>> {
@Nullable
private Coder<T> elemCoder;
@@ -533,7 +536,7 @@ public class StateSpecs {
}
@Override
- public SetState<T> bind(String id, StateBinder<?> visitor) {
+ public SetState<T> bind(String id, StateBinder visitor) {
return visitor.bindSet(id, this, elemCoder);
}
@@ -582,8 +585,7 @@ public class StateSpecs {
* <p>Includes the {@link TimestampCombiner} according to which the output times
* are combined.
*/
- private static class WatermarkStateSpecInternal<W extends BoundedWindow>
- implements StateSpec<Object, WatermarkHoldState> {
+ private static class WatermarkStateSpecInternal implements StateSpec<WatermarkHoldState> {
/**
* When multiple output times are added to hold the watermark, this determines how they are
@@ -597,7 +599,7 @@ public class StateSpecs {
}
@Override
- public WatermarkHoldState bind(String id, StateBinder<?> visitor) {
+ public WatermarkHoldState bind(String id, StateBinder visitor) {
return visitor.bindWatermark(id, this, timestampCombiner);
}