You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/06/06 15:20:36 UTC
[1/2] beam git commit: Flink runner: support MapState in
FlinkStateInternals.
Repository: beam
Updated Branches:
refs/heads/master aebd3a4c5 -> a05455088
Flink runner: support MapState in FlinkStateInternals.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dbab052c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dbab052c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dbab052c
Branch: refs/heads/master
Commit: dbab052c4456ff51dd4ce44979c77a508acc17e9
Parents: aebd3a4
Author: 波特 <ha...@alibaba-inc.com>
Authored: Thu May 18 12:23:20 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Tue Jun 6 23:18:33 2017 +0800
----------------------------------------------------------------------
runners/flink/pom.xml | 1 -
.../streaming/state/FlinkStateInternals.java | 205 ++++++++++++++++++-
2 files changed, 202 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/dbab052c/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 92f95a0..c4c6b55 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -92,7 +92,6 @@
org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
org.apache.beam.sdk.testing.LargeKeys$Above100MB,
org.apache.beam.sdk.testing.UsesSetState,
- org.apache.beam.sdk.testing.UsesMapState,
org.apache.beam.sdk.testing.UsesCommittedMetrics,
org.apache.beam.sdk.testing.UsesTestStream,
org.apache.beam.sdk.testing.UsesSplittableParDo
http://git-wip-us.apache.org/repos/asf/beam/blob/dbab052c/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index b73abe9..f0d3278 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -26,6 +26,7 @@ import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.InstantCoder;
@@ -46,6 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.CombineContextFactory;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.state.KeyedStateBackend;
@@ -132,11 +134,11 @@ public class FlinkStateInternals<K> implements StateInternals {
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<MapState<KeyT, ValueT>> spec,
+ StateTag<MapState<KeyT, ValueT>> address,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", MapState.class.getSimpleName()));
+ return new FlinkMapState<>(
+ flinkStateBackend, address, namespace, mapKeyCoder, mapValueCoder);
}
@Override
@@ -1029,4 +1031,201 @@ public class FlinkStateInternals<K> implements StateInternals {
return result;
}
}
+
+ private static class FlinkMapState<KeyT, ValueT> implements MapState<KeyT, ValueT> {
+
+ private final StateNamespace namespace;
+ private final StateTag<MapState<KeyT, ValueT>> address;
+ private final MapStateDescriptor<KeyT, ValueT> flinkStateDescriptor;
+ private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+
+ FlinkMapState(
+ KeyedStateBackend<ByteBuffer> flinkStateBackend,
+ StateTag<MapState<KeyT, ValueT>> address,
+ StateNamespace namespace,
+ Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
+ this.namespace = namespace;
+ this.address = address;
+ this.flinkStateBackend = flinkStateBackend;
+ this.flinkStateDescriptor = new MapStateDescriptor<>(address.getId(),
+ new CoderTypeSerializer<>(mapKeyCoder), new CoderTypeSerializer<>(mapValueCoder));
+ }
+
+ @Override
+ public ReadableState<ValueT> get(final KeyT input) {
+ return new ReadableState<ValueT>() {
+ @Override
+ public ValueT read() {
+ try {
+ return flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).get(input);
+ } catch (Exception e) {
+ throw new RuntimeException("Error get from state.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<ValueT> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public void put(KeyT key, ValueT value) {
+ try {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).put(key, value);
+ } catch (Exception e) {
+ throw new RuntimeException("Error put kv to state.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<ValueT> putIfAbsent(final KeyT key, final ValueT value) {
+ return new ReadableState<ValueT>() {
+ @Override
+ public ValueT read() {
+ try {
+ ValueT current = flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).get(key);
+
+ if (current == null) {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).put(key, value);
+ }
+ return current;
+ } catch (Exception e) {
+ throw new RuntimeException("Error put kv to state.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<ValueT> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public void remove(KeyT key) {
+ try {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).remove(key);
+ } catch (Exception e) {
+ throw new RuntimeException("Error remove map state key.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<Iterable<KeyT>> keys() {
+ return new ReadableState<Iterable<KeyT>>() {
+ @Override
+ public Iterable<KeyT> read() {
+ try {
+ return flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).keys();
+ } catch (Exception e) {
+ throw new RuntimeException("Error get map state keys.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<Iterable<KeyT>> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public ReadableState<Iterable<ValueT>> values() {
+ return new ReadableState<Iterable<ValueT>>() {
+ @Override
+ public Iterable<ValueT> read() {
+ try {
+ return flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).values();
+ } catch (Exception e) {
+ throw new RuntimeException("Error get map state values.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<Iterable<ValueT>> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> entries() {
+ return new ReadableState<Iterable<Map.Entry<KeyT, ValueT>>>() {
+ @Override
+ public Iterable<Map.Entry<KeyT, ValueT>> read() {
+ try {
+ return flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).entries();
+ } catch (Exception e) {
+ throw new RuntimeException("Error get map state entries.", e);
+ }
+ }
+
+ @Override
+ public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public void clear() {
+ try {
+ flinkStateBackend.getPartitionedState(
+ namespace.stringKey(),
+ StringSerializer.INSTANCE,
+ flinkStateDescriptor).clear();
+ } catch (Exception e) {
+ throw new RuntimeException("Error clearing state.", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlinkMapState<?, ?> that = (FlinkMapState<?, ?>) o;
+
+ return namespace.equals(that.namespace) && address.equals(that.address);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + address.hashCode();
+ return result;
+ }
+ }
+
}
[2/2] beam git commit: This closes #3289
Posted by pe...@apache.org.
This closes #3289
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a0545508
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a0545508
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a0545508
Branch: refs/heads/master
Commit: a05455088de338b46582a55df974865561dc70e7
Parents: aebd3a4 dbab052
Author: Pei He <pe...@apache.org>
Authored: Tue Jun 6 23:19:25 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Tue Jun 6 23:19:25 2017 +0800
----------------------------------------------------------------------
runners/flink/pom.xml | 1 -
.../streaming/state/FlinkStateInternals.java | 205 ++++++++++++++++++-
2 files changed, 202 insertions(+), 4 deletions(-)
----------------------------------------------------------------------