You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2018/10/23 15:21:54 UTC
[beam] branch master updated: Replace deprecated
StateTag.StateBinder in FlinkStateInternals (#6754)
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f3cb463 Replace deprecated StateTag.StateBinder in FlinkStateInternals (#6754)
f3cb463 is described below
commit f3cb4630efe011f53fd0abe85c2f03836073faf6
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Tue Oct 23 17:21:45 2018 +0200
Replace deprecated StateTag.StateBinder in FlinkStateInternals (#6754)
* Replace deprecated StateTag.StateBinder in FlinkStateInternals
* Convert anonymous class / Pass only required dependencies
---
.../streaming/state/FlinkStateInternals.java | 235 +++++++++++----------
1 file changed, 124 insertions(+), 111 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index a65e792..02a2ebe 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -38,7 +38,9 @@ import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ReadableStates;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.StateBinder;
import org.apache.beam.sdk.state.StateContext;
+import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
@@ -99,93 +101,108 @@ public class FlinkStateInternals<K> implements StateInternals {
@Override
public <T extends State> T state(
- final StateNamespace namespace, StateTag<T> address, final StateContext<?> context) {
-
- return address.bind(
- new StateTag.StateBinder() {
+ StateNamespace namespace, StateTag<T> address, StateContext<?> context) {
+ return address
+ .getSpec()
+ .bind(
+ address.getId(),
+ new FlinkStateBinder(namespace, context, flinkStateBackend, watermarkHolds));
+ }
- @Override
- public <T2> ValueState<T2> bindValue(StateTag<ValueState<T2>> address, Coder<T2> coder) {
+ private static class FlinkStateBinder implements StateBinder {
- return new FlinkValueState<>(flinkStateBackend, address, namespace, coder);
- }
-
- @Override
- public <T2> BagState<T2> bindBag(StateTag<BagState<T2>> address, Coder<T2> elemCoder) {
+ private final StateNamespace namespace;
+ private final StateContext<?> stateContext;
+ private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+ private final Map<String, Instant> watermarkHolds;
- return new FlinkBagState<>(flinkStateBackend, address, namespace, elemCoder);
- }
+ private FlinkStateBinder(
+ StateNamespace namespace,
+ StateContext<?> stateContext,
+ KeyedStateBackend<ByteBuffer> flinkStateBackend,
+ Map<String, Instant> watermarkHolds) {
+ this.namespace = namespace;
+ this.stateContext = stateContext;
+ this.flinkStateBackend = flinkStateBackend;
+ this.watermarkHolds = watermarkHolds;
+ }
- @Override
- public <T2> SetState<T2> bindSet(StateTag<SetState<T2>> address, Coder<T2> elemCoder) {
- return new FlinkSetState<>(flinkStateBackend, address, namespace, elemCoder);
- }
+ @Override
+ public <T2> ValueState<T2> bindValue(
+ String id, StateSpec<ValueState<T2>> spec, Coder<T2> coder) {
+ return new FlinkValueState<>(flinkStateBackend, id, namespace, coder);
+ }
- @Override
- public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<MapState<KeyT, ValueT>> address,
- Coder<KeyT> mapKeyCoder,
- Coder<ValueT> mapValueCoder) {
- return new FlinkMapState<>(
- flinkStateBackend, address, namespace, mapKeyCoder, mapValueCoder);
- }
+ @Override
+ public <T2> BagState<T2> bindBag(String id, StateSpec<BagState<T2>> spec, Coder<T2> elemCoder) {
+ return new FlinkBagState<>(flinkStateBackend, id, namespace, elemCoder);
+ }
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ @Override
+ public <T2> SetState<T2> bindSet(String id, StateSpec<SetState<T2>> spec, Coder<T2> elemCoder) {
+ return new FlinkSetState<>(flinkStateBackend, id, namespace, elemCoder);
+ }
- return new FlinkCombiningState<>(
- flinkStateBackend, address, combineFn, namespace, accumCoder);
- }
+ @Override
+ public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+ String id,
+ StateSpec<MapState<KeyT, ValueT>> spec,
+ Coder<KeyT> mapKeyCoder,
+ Coder<ValueT> mapValueCoder) {
+ return new FlinkMapState<>(flinkStateBackend, id, namespace, mapKeyCoder, mapValueCoder);
+ }
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
- StateTag<CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
- return new FlinkCombiningStateWithContext<>(
- flinkStateBackend,
- address,
- combineFn,
- namespace,
- accumCoder,
- FlinkStateInternals.this,
- CombineContextFactory.createFromStateContext(context));
- }
+ @Override
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(
+ String id,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ return new FlinkCombiningState<>(flinkStateBackend, id, combineFn, namespace, accumCoder);
+ }
- @Override
- public WatermarkHoldState bindWatermark(
- StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) {
+ @Override
+ public <InputT, AccumT, OutputT>
+ CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
+ String id,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ return new FlinkCombiningStateWithContext<>(
+ flinkStateBackend,
+ id,
+ combineFn,
+ namespace,
+ accumCoder,
+ CombineContextFactory.createFromStateContext(stateContext));
+ }
- return new FlinkWatermarkHoldState<>(
- flinkStateBackend, FlinkStateInternals.this, address, namespace, timestampCombiner);
- }
- });
+ @Override
+ public WatermarkHoldState bindWatermark(
+ String id, StateSpec<WatermarkHoldState> spec, TimestampCombiner timestampCombiner) {
+ return new FlinkWatermarkHoldState<>(
+ flinkStateBackend, watermarkHolds, id, namespace, timestampCombiner);
+ }
}
- private static class FlinkValueState<K, T> implements ValueState<T> {
+ private static class FlinkValueState<T> implements ValueState<T> {
private final StateNamespace namespace;
- private final StateTag<ValueState<T>> address;
+ private final String stateId;
private final ValueStateDescriptor<T> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
FlinkValueState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<ValueState<T>> address,
+ String stateId,
StateNamespace namespace,
Coder<T> coder) {
this.namespace = namespace;
- this.address = address;
+ this.stateId = stateId;
this.flinkStateBackend = flinkStateBackend;
- flinkStateDescriptor =
- new ValueStateDescriptor<>(address.getId(), new CoderTypeSerializer<>(coder));
+ flinkStateDescriptor = new ValueStateDescriptor<>(stateId, new CoderTypeSerializer<>(coder));
}
@Override
@@ -238,15 +255,15 @@ public class FlinkStateInternals<K> implements StateInternals {
return false;
}
- FlinkValueState<?, ?> that = (FlinkValueState<?, ?>) o;
+ FlinkValueState<?> that = (FlinkValueState<?>) o;
- return namespace.equals(that.namespace) && address.equals(that.address);
+ return namespace.equals(that.namespace) && stateId.equals(that.stateId);
}
@Override
public int hashCode() {
int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
+ result = 31 * result + stateId.hashCode();
return result;
}
}
@@ -254,22 +271,21 @@ public class FlinkStateInternals<K> implements StateInternals {
private static class FlinkBagState<K, T> implements BagState<T> {
private final StateNamespace namespace;
- private final StateTag<BagState<T>> address;
+ private final String stateId;
private final ListStateDescriptor<T> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
FlinkBagState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<BagState<T>> address,
+ String stateId,
StateNamespace namespace,
Coder<T> coder) {
this.namespace = namespace;
- this.address = address;
+ this.stateId = stateId;
this.flinkStateBackend = flinkStateBackend;
- flinkStateDescriptor =
- new ListStateDescriptor<>(address.getId(), new CoderTypeSerializer<>(coder));
+ flinkStateDescriptor = new ListStateDescriptor<>(stateId, new CoderTypeSerializer<>(coder));
}
@Override
@@ -351,13 +367,13 @@ public class FlinkStateInternals<K> implements StateInternals {
FlinkBagState<?, ?> that = (FlinkBagState<?, ?>) o;
- return namespace.equals(that.namespace) && address.equals(that.address);
+ return namespace.equals(that.namespace) && stateId.equals(that.stateId);
}
@Override
public int hashCode() {
int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
+ result = 31 * result + stateId.hashCode();
return result;
}
}
@@ -366,25 +382,25 @@ public class FlinkStateInternals<K> implements StateInternals {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
+ private final String stateId;
private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
FlinkCombiningState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<CombiningState<InputT, AccumT, OutputT>> address,
+ String stateId,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder) {
this.namespace = namespace;
- this.address = address;
+ this.stateId = stateId;
this.combineFn = combineFn;
this.flinkStateBackend = flinkStateBackend;
flinkStateDescriptor =
- new ValueStateDescriptor<>(address.getId(), new CoderTypeSerializer<>(accumCoder));
+ new ValueStateDescriptor<>(stateId, new CoderTypeSerializer<>(accumCoder));
}
@Override
@@ -510,13 +526,13 @@ public class FlinkStateInternals<K> implements StateInternals {
FlinkCombiningState<?, ?, ?, ?> that = (FlinkCombiningState<?, ?, ?, ?>) o;
- return namespace.equals(that.namespace) && address.equals(that.address);
+ return namespace.equals(that.namespace) && stateId.equals(that.stateId);
}
@Override
public int hashCode() {
int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
+ result = 31 * result + stateId.hashCode();
return result;
}
}
@@ -525,31 +541,28 @@ public class FlinkStateInternals<K> implements StateInternals {
implements CombiningState<InputT, AccumT, OutputT> {
private final StateNamespace namespace;
- private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
+ private final String stateId;
private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
- private final FlinkStateInternals<K> flinkStateInternals;
private final CombineWithContext.Context context;
FlinkCombiningStateWithContext(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<CombiningState<InputT, AccumT, OutputT>> address,
+ String stateId,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
- FlinkStateInternals<K> flinkStateInternals,
CombineWithContext.Context context) {
this.namespace = namespace;
- this.address = address;
+ this.stateId = stateId;
this.combineFn = combineFn;
this.flinkStateBackend = flinkStateBackend;
- this.flinkStateInternals = flinkStateInternals;
this.context = context;
flinkStateDescriptor =
- new ValueStateDescriptor<>(address.getId(), new CoderTypeSerializer<>(accumCoder));
+ new ValueStateDescriptor<>(stateId, new CoderTypeSerializer<>(accumCoder));
}
@Override
@@ -676,40 +689,40 @@ public class FlinkStateInternals<K> implements StateInternals {
FlinkCombiningStateWithContext<?, ?, ?, ?> that =
(FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
- return namespace.equals(that.namespace) && address.equals(that.address);
+ return namespace.equals(that.namespace) && stateId.equals(that.stateId);
}
@Override
public int hashCode() {
int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
+ result = 31 * result + stateId.hashCode();
return result;
}
}
private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
implements WatermarkHoldState {
- private final StateTag<WatermarkHoldState> address;
+ private final String stateId;
private final TimestampCombiner timestampCombiner;
private final StateNamespace namespace;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
- private final FlinkStateInternals<K> flinkStateInternals;
+ private final Map<String, Instant> watermarkHolds;
private final ValueStateDescriptor<Instant> flinkStateDescriptor;
public FlinkWatermarkHoldState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- FlinkStateInternals<K> flinkStateInternals,
- StateTag<WatermarkHoldState> address,
+ Map<String, Instant> watermarkHolds,
+ String stateId,
StateNamespace namespace,
TimestampCombiner timestampCombiner) {
- this.address = address;
+ this.stateId = stateId;
this.timestampCombiner = timestampCombiner;
this.namespace = namespace;
this.flinkStateBackend = flinkStateBackend;
- this.flinkStateInternals = flinkStateInternals;
+ this.watermarkHolds = watermarkHolds;
flinkStateDescriptor =
- new ValueStateDescriptor<>(address.getId(), new CoderTypeSerializer<>(InstantCoder.of()));
+ new ValueStateDescriptor<>(stateId, new CoderTypeSerializer<>(InstantCoder.of()));
}
@Override
@@ -755,11 +768,11 @@ public class FlinkStateInternals<K> implements StateInternals {
Instant current = state.value();
if (current == null) {
state.update(value);
- flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value);
+ watermarkHolds.put(namespace.stringKey(), value);
} else {
Instant combined = timestampCombiner.combine(current, value);
state.update(combined);
- flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined);
+ watermarkHolds.put(namespace.stringKey(), combined);
}
} catch (Exception e) {
throw new RuntimeException("Error updating state.", e);
@@ -780,7 +793,7 @@ public class FlinkStateInternals<K> implements StateInternals {
@Override
public void clear() {
- flinkStateInternals.watermarkHolds.remove(namespace.stringKey());
+ watermarkHolds.remove(namespace.stringKey());
try {
org.apache.flink.api.common.state.ValueState<Instant> state =
flinkStateBackend.getPartitionedState(
@@ -802,7 +815,7 @@ public class FlinkStateInternals<K> implements StateInternals {
FlinkWatermarkHoldState<?, ?> that = (FlinkWatermarkHoldState<?, ?>) o;
- if (!address.equals(that.address)) {
+ if (!stateId.equals(that.stateId)) {
return false;
}
if (!timestampCombiner.equals(that.timestampCombiner)) {
@@ -813,7 +826,7 @@ public class FlinkStateInternals<K> implements StateInternals {
@Override
public int hashCode() {
- int result = address.hashCode();
+ int result = stateId.hashCode();
result = 31 * result + timestampCombiner.hashCode();
result = 31 * result + namespace.hashCode();
return result;
@@ -823,22 +836,22 @@ public class FlinkStateInternals<K> implements StateInternals {
private static class FlinkMapState<KeyT, ValueT> implements MapState<KeyT, ValueT> {
private final StateNamespace namespace;
- private final StateTag<MapState<KeyT, ValueT>> address;
+ private final String stateId;
private final MapStateDescriptor<KeyT, ValueT> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
FlinkMapState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<MapState<KeyT, ValueT>> address,
+ String stateId,
StateNamespace namespace,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
this.namespace = namespace;
- this.address = address;
+ this.stateId = stateId;
this.flinkStateBackend = flinkStateBackend;
this.flinkStateDescriptor =
new MapStateDescriptor<>(
- address.getId(),
+ stateId,
new CoderTypeSerializer<>(mapKeyCoder),
new CoderTypeSerializer<>(mapValueCoder));
}
@@ -996,13 +1009,13 @@ public class FlinkStateInternals<K> implements StateInternals {
FlinkMapState<?, ?> that = (FlinkMapState<?, ?>) o;
- return namespace.equals(that.namespace) && address.equals(that.address);
+ return namespace.equals(that.namespace) && stateId.equals(that.stateId);
}
@Override
public int hashCode() {
int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
+ result = 31 * result + stateId.hashCode();
return result;
}
}
@@ -1010,21 +1023,21 @@ public class FlinkStateInternals<K> implements StateInternals {
private static class FlinkSetState<T> implements SetState<T> {
private final StateNamespace namespace;
- private final StateTag<SetState<T>> address;
+ private final String stateId;
private final MapStateDescriptor<T, Boolean> flinkStateDescriptor;
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
FlinkSetState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
- StateTag<SetState<T>> address,
+ String stateId,
StateNamespace namespace,
Coder<T> coder) {
this.namespace = namespace;
- this.address = address;
+ this.stateId = stateId;
this.flinkStateBackend = flinkStateBackend;
this.flinkStateDescriptor =
new MapStateDescriptor<>(
- address.getId(), new CoderTypeSerializer<>(coder), new BooleanSerializer());
+ stateId, new CoderTypeSerializer<>(coder), new BooleanSerializer());
}
@Override
@@ -1147,13 +1160,13 @@ public class FlinkStateInternals<K> implements StateInternals {
FlinkSetState<?> that = (FlinkSetState<?>) o;
- return namespace.equals(that.namespace) && address.equals(that.address);
+ return namespace.equals(that.namespace) && stateId.equals(that.stateId);
}
@Override
public int hashCode() {
int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
+ result = 31 * result + stateId.hashCode();
return result;
}
}