You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/23 16:06:28 UTC
[2/2] flink git commit: [FLINK-4856] Add MapState for keyed state
[FLINK-4856] Add MapState for keyed state
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30c9e2b6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30c9e2b6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30c9e2b6
Branch: refs/heads/master
Commit: 30c9e2b683bf7f4776ffc23b6a860946a4429ae5
Parents: de2605e
Author: xiaogang.sxg <xi...@alibaba-inc.com>
Authored: Fri Feb 17 11:19:18 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Feb 23 16:56:29 2017 +0100
----------------------------------------------------------------------
docs/dev/stream/state.md | 8 +-
.../streaming/state/AbstractRocksDBState.java | 49 +-
.../state/RocksDBKeyedStateBackend.java | 10 +
.../streaming/state/RocksDBMapState.java | 546 +++++++++++++++++++
.../api/common/functions/RuntimeContext.java | 42 ++
.../util/AbstractRuntimeUDFContext.java | 9 +
.../flink/api/common/state/KeyedStateStore.java | 40 ++
.../apache/flink/api/common/state/MapState.java | 134 +++++
.../api/common/state/MapStateDescriptor.java | 147 +++++
.../flink/api/common/state/StateBinder.java | 9 +
.../flink/api/common/state/StateDescriptor.java | 2 +-
.../common/typeutils/base/MapSerializer.java | 193 +++++++
.../flink/api/java/typeutils/MapTypeInfo.java | 147 +++++
.../common/state/MapStateDescriptorTest.java | 115 ++++
.../typeutils/base/MapSerializerTest.java | 90 +++
.../flink/hdfstests/FileStateBackendTest.java | 4 +
.../netty/message/KvStateRequestSerializer.java | 67 +++
.../state/AbstractKeyedStateBackend.java | 23 +-
.../runtime/state/DefaultKeyedStateStore.java | 14 +
.../flink/runtime/state/HashMapSerializer.java | 193 +++++++
.../flink/runtime/state/UserFacingMapState.java | 103 ++++
.../state/heap/HeapKeyedStateBackend.java | 18 +-
.../flink/runtime/state/heap/HeapMapState.java | 311 +++++++++++
.../state/internal/InternalMapState.java | 32 ++
.../message/KvStateRequestSerializerTest.java | 131 +++++
.../runtime/state/FileStateBackendTest.java | 4 +
.../runtime/state/MemoryStateBackendTest.java | 4 +
.../runtime/state/SerializationProxiesTest.java | 3 +-
.../runtime/state/StateBackendTestBase.java | 299 ++++++++++
.../api/functions/async/RichAsyncFunction.java | 7 +
.../api/operators/StreamingRuntimeContext.java | 9 +
.../functions/async/RichAsyncFunctionTest.java | 8 +-
.../operators/StreamingRuntimeContextTest.java | 85 ++-
.../KVStateRequestSerializerRocksDBTest.java | 40 ++
34 files changed, 2887 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index e554e29..40522e1 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -118,6 +118,11 @@ added to the state. Contrary to `ReducingState`, the aggregate type may be diffe
of elements that are added to the state. The interface is the same as for `ListState` but elements
added using `add(T)` are folded into an aggregate using a specified `FoldFunction`.
+* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and
+retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or
+`putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable
+views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively.
+
All types of state also have a method `clear()` that clears the state for the currently
active key, i.e. the key of the input element.
@@ -136,7 +141,7 @@ To get a state handle, you have to create a `StateDescriptor`. This holds the na
that you can reference them), the type of the values that the state holds, and possibly
a user-specified function, such as a `ReduceFunction`. Depending on what type of state you
want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor`,
-a `ReducingStateDescriptor` or a `FoldingStateDescriptor`.
+a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a `MapStateDescriptor`.
State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*.
Please see [here]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) for
@@ -147,6 +152,7 @@ is available in a `RichFunction` has these methods for accessing state:
* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
* `ListState<T> getListState(ListStateDescriptor<T>)`
* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
+* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`
This is an example `FlatMapFunction` that shows how all of the parts fit together:
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 89f41aa..569971a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -21,7 +21,10 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
@@ -50,7 +53,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
implements InternalKvState<N>, State {
/** Serializer for the namespace */
- private final TypeSerializer<N> namespaceSerializer;
+ final TypeSerializer<N> namespaceSerializer;
/** The current namespace, which the next value methods will refer to */
private N currentNamespace;
@@ -215,4 +218,48 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
value >>>= 8;
} while (value != 0);
}
+
+ protected Tuple3<Integer, K, N> readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException {
+ int keyGroup = readKeyGroup(inputView);
+ K key = readKey(inputStream, inputView);
+ N namespace = readNamespace(inputStream, inputView);
+
+ return new Tuple3<>(keyGroup, key, namespace);
+ }
+
+ private int readKeyGroup(DataInputView inputView) throws IOException {
+ int keyGroup = 0;
+ for (int i = 0; i < backend.getKeyGroupPrefixBytes(); ++i) {
+ keyGroup <<= 8;
+ keyGroup |= (inputView.readByte() & 0xFF);
+ }
+ return keyGroup;
+ }
+
+ private K readKey(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException {
+ int beforeRead = inputStream.getPosition();
+ K key = backend.getKeySerializer().deserialize(inputView);
+ if (ambiguousKeyPossible) {
+ int length = inputStream.getPosition() - beforeRead;
+ readVariableIntBytes(inputView, length);
+ }
+ return key;
+ }
+
+ private N readNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException {
+ int beforeRead = inputStream.getPosition();
+ N namespace = namespaceSerializer.deserialize(inputView);
+ if (ambiguousKeyPossible) {
+ int length = inputStream.getPosition() - beforeRead;
+ readVariableIntBytes(inputView, length);
+ }
+ return namespace;
+ }
+
+ private void readVariableIntBytes(DataInputView inputView, int value) throws IOException {
+ do {
+ inputView.readByte();
+ value >>>= 8;
+ } while (value != 0);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index d8d77b6..a0efe78 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
@@ -53,6 +54,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.util.SerializableObject;
@@ -882,6 +884,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
+ @Override
+ protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer,
+ MapStateDescriptor<UK, UV> stateDesc) throws Exception {
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
+
+ return new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this);
+ }
+
/**
* Wraps a RocksDB iterator to cache it's current key and assign an id for the key/value state to the iterator.
* Used by #MergeIterator.
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
new file mode 100644
index 0000000..e9e9d9b
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * <p>
+ * <p>{@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <UK> The type of the keys in the map state.
+ * @param <UV> The type of the values in the map state.
+ */
+public class RocksDBMapState<K, N, UK, UV>
+ extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>>
+ implements InternalMapState<N, UK, UV> {
+
+ private static Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
+
+ /** Serializer for the keys and values */
+ private final TypeSerializer<UK> userKeySerializer;
+ private final TypeSerializer<UV> userValueSerializer;
+
+ /**
+ * We disable writes to the write-ahead-log here. We can't have these in the base class
+ * because JNI segfaults for some reason if they are.
+ */
+ private final WriteOptions writeOptions;
+
+ /**
+ * Creates a new {@code RocksDBMapState}.
+ *
+ * @param namespaceSerializer The serializer for the namespace.
+ * @param stateDesc The state identifier for the state.
+ */
+ public RocksDBMapState(ColumnFamilyHandle columnFamily,
+ TypeSerializer<N> namespaceSerializer,
+ MapStateDescriptor<UK, UV> stateDesc,
+ RocksDBKeyedStateBackend<K> backend) {
+
+ super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+ this.userKeySerializer = stateDesc.getKeySerializer();
+ this.userValueSerializer = stateDesc.getValueSerializer();
+
+ writeOptions = new WriteOptions();
+ writeOptions.setDisableWAL(true);
+ }
+
+ // ------------------------------------------------------------------------
+ // MapState Implementation
+ // ------------------------------------------------------------------------
+
+ @Override
+ public UV get(UK userKey) throws IOException, RocksDBException {
+ byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+ byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
+
+ return (rawValueBytes == null ? null : deserializeUserValue(rawValueBytes));
+ }
+
+ @Override
+ public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
+
+ byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+ byte[] rawValueBytes = serializeUserValue(userValue);
+
+ backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
+ }
+
+ @Override
+ public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
+ if (map == null) {
+ return;
+ }
+
+ for (Map.Entry<UK, UV> entry : map.entrySet()) {
+ put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void remove(UK userKey) throws IOException, RocksDBException {
+ byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+
+ backend.db.remove(columnFamily, writeOptions, rawKeyBytes);
+ }
+
+ @Override
+ public boolean contains(UK userKey) throws IOException, RocksDBException {
+ byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+ byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
+
+ return (rawValueBytes != null);
+ }
+
+ @Override
+ public int size() throws IOException, RocksDBException {
+ Iterator<Map.Entry<UK, UV>> iterator = iterator();
+
+ int count = 0;
+ while (iterator.hasNext()) {
+ count++;
+ iterator.next();
+ }
+
+ return count;
+ }
+
+ @Override
+ public Iterable<Map.Entry<UK, UV>> entries() throws IOException, RocksDBException {
+ final Iterator<Map.Entry<UK, UV>> iterator = iterator();
+
+ // Return null to make the behavior consistent with other states.
+ if (!iterator.hasNext()) {
+ return null;
+ } else {
+ return new Iterable<Map.Entry<UK, UV>>() {
+ @Override
+ public Iterator<Map.Entry<UK, UV>> iterator() {
+ return iterator;
+ }
+ };
+ }
+ }
+
+ @Override
+ public Iterable<UK> keys() throws IOException, RocksDBException {
+ final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+
+ return new Iterable<UK>() {
+ @Override
+ public Iterator<UK> iterator() {
+ return new RocksDBMapIterator<UK>(backend.db, prefixBytes) {
+ @Override
+ public UK next() {
+ RocksDBMapEntry entry = nextEntry();
+ return (entry == null ? null : entry.getKey());
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public Iterable<UV> values() throws IOException, RocksDBException {
+ final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+
+ return new Iterable<UV>() {
+ @Override
+ public Iterator<UV> iterator() {
+ return new RocksDBMapIterator<UV>(backend.db, prefixBytes) {
+ @Override
+ public UV next() {
+ RocksDBMapEntry entry = nextEntry();
+ return (entry == null ? null : entry.getValue());
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public Iterator<Map.Entry<UK, UV>> iterator() throws IOException, RocksDBException {
+ final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+
+ return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes) {
+ @Override
+ public Map.Entry<UK, UV> next() {
+ return nextEntry();
+ }
+ };
+ }
+
+ @Override
+ public void clear() {
+ try {
+ Iterator<Map.Entry<UK, UV>> iterator = iterator();
+
+ while (iterator.hasNext()) {
+ iterator.next();
+ iterator.remove();
+ }
+ } catch (Exception e) {
+ LOG.warn("Error while cleaning the state.", e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
+ Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
+
+ //TODO make KvStateRequestSerializer key-group aware to save this round trip and key-group computation
+ Tuple2<K, N> des = KvStateRequestSerializer.deserializeKeyAndNamespace(
+ serializedKeyAndNamespace,
+ backend.getKeySerializer(),
+ namespaceSerializer);
+
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
+
+ ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128);
+ DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
+ writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, outputStream, outputView);
+ final byte[] keyPrefixBytes = outputStream.toByteArray();
+
+ final Iterator<Map.Entry<UK, UV>> iterator = new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, keyPrefixBytes) {
+ @Override
+ public Map.Entry<UK, UV> next() {
+ return nextEntry();
+ }
+ };
+
+ // Return null to make the behavior consistent with other backends
+ if (!iterator.hasNext()) {
+ return null;
+ }
+
+ return KvStateRequestSerializer.serializeMap(new Iterable<Map.Entry<UK, UV>>() {
+ @Override
+ public Iterator<Map.Entry<UK, UV>> iterator() {
+ return iterator;
+ }
+ }, userKeySerializer, userValueSerializer);
+ }
+
+ // ------------------------------------------------------------------------
+ // Serialization Methods
+ // ------------------------------------------------------------------------
+
+ private byte[] serializeCurrentKeyAndNamespace() throws IOException {
+ writeCurrentKeyWithGroupAndNamespace();
+
+ return keySerializationStream.toByteArray();
+ }
+
+ private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException {
+ writeCurrentKeyWithGroupAndNamespace();
+ userKeySerializer.serialize(userKey, keySerializationDataOutputView);
+
+ return keySerializationStream.toByteArray();
+ }
+
+ private byte[] serializeUserValue(UV userValue) throws IOException {
+ keySerializationStream.reset();
+
+ if (userValue == null) {
+ keySerializationDataOutputView.writeBoolean(true);
+ } else {
+ keySerializationDataOutputView.writeBoolean(false);
+ userValueSerializer.serialize(userValue, keySerializationDataOutputView);
+ }
+
+
+ return keySerializationStream.toByteArray();
+ }
+
+ private UK deserializeUserKey(byte[] rawKeyBytes) throws IOException {
+ ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes);
+ DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
+
+ readKeyWithGroupAndNamespace(bais, in);
+
+ return userKeySerializer.deserialize(in);
+ }
+
+ private UV deserializeUserValue(byte[] rawValueBytes) throws IOException {
+ ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawValueBytes);
+ DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
+
+ boolean isNull = in.readBoolean();
+
+ return isNull ? null : userValueSerializer.deserialize(in);
+ }
+
+ // ------------------------------------------------------------------------
+ // Internal Classes
+ // ------------------------------------------------------------------------
+
+ /** A map entry in RocksDBMapState */
+ private class RocksDBMapEntry implements Map.Entry<UK, UV> {
+ private final RocksDB db;
+
+ /** The raw bytes of the key stored in RocksDB. Each user key is stored in RocksDB
+ * with the format #KeyGroup#Key#Namespace#UserKey. */
+ private final byte[] rawKeyBytes;
+
+ /** The raw bytes of the value stored in RocksDB */
+ private byte[] rawValueBytes;
+
+ /** True if the entry has been deleted. */
+ private boolean deleted;
+
+ /** The user key and value. The deserialization is performed lazily, i.e. the key
+ * and the value is deserialized only when they are accessed. */
+ private UK userKey = null;
+ private UV userValue = null;
+
+ RocksDBMapEntry(final RocksDB db, final byte[] rawKeyBytes, final byte[] rawValueBytes) {
+ this.db = db;
+
+ this.rawKeyBytes = rawKeyBytes;
+ this.rawValueBytes = rawValueBytes;
+ this.deleted = false;
+ }
+
+ public void remove() {
+ deleted = true;
+ rawValueBytes = null;
+
+ try {
+ db.remove(columnFamily, writeOptions, rawKeyBytes);
+ } catch (RocksDBException e) {
+ throw new RuntimeException("Error while removing data from RocksDB.", e);
+ }
+ }
+
+ @Override
+ public UK getKey() {
+ if (userKey == null) {
+ try {
+ userKey = deserializeUserKey(rawKeyBytes);
+ } catch (IOException e) {
+ throw new RuntimeException("Error while deserializing the user key.");
+ }
+ }
+
+ return userKey;
+ }
+
+ @Override
+ public UV getValue() {
+ if (deleted) {
+ return null;
+ } else {
+ if (userValue == null) {
+ try {
+ userValue = deserializeUserValue(rawValueBytes);
+ } catch (IOException e) {
+ throw new RuntimeException("Error while deserializing the user value.");
+ }
+ }
+
+ return userValue;
+ }
+ }
+
+ @Override
+ public UV setValue(UV value) {
+ if (deleted) {
+ throw new IllegalStateException("The value has already been deleted.");
+ }
+
+ UV oldValue = getValue();
+
+ try {
+ userValue = value;
+ rawValueBytes = serializeUserValue(value);
+
+ db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
+ } catch (IOException | RocksDBException e) {
+ throw new RuntimeException("Error while putting data into RocksDB.", e);
+ }
+
+ return oldValue;
+ }
+ }
+
+ /** An auxiliary utility to scan all entries under the given key. */
+ private abstract class RocksDBMapIterator<T> implements Iterator<T> {
+
+ final static int CACHE_SIZE_BASE = 1;
+ final static int CACHE_SIZE_LIMIT = 128;
+
+ /** The db where data resides. */
+ private final RocksDB db;
+
+ /**
+ * The prefix bytes of the key being accessed. All entries under the same key
+ * has the same prefix, hence we can stop the iterating once coming across an
+ * entry with a different prefix.
+ */
+ private final byte[] keyPrefixBytes;
+
+ /**
+ * True if all entries have been accessed or the iterator has come across an
+ * entry with a different prefix.
+ */
+ private boolean expired = false;
+
+ /** A in-memory cache for the entries in the rocksdb. */
+ private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();
+ private int cacheIndex = 0;
+
+
+ RocksDBMapIterator(final RocksDB db, final byte[] keyPrefixBytes) {
+ this.db = db;
+ this.keyPrefixBytes = keyPrefixBytes;
+ }
+
+ @Override
+ public boolean hasNext() {
+ loadCache();
+
+ return (cacheIndex < cacheEntries.size());
+ }
+
+ @Override
+ public void remove() {
+ if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) {
+ throw new IllegalStateException("The remove operation must be called after an valid next operation.");
+ }
+
+ RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1);
+ lastEntry.remove();
+ }
+
+ final RocksDBMapEntry nextEntry() {
+ loadCache();
+
+ if (cacheIndex == cacheEntries.size()) {
+ if (!expired) {
+ throw new IllegalStateException();
+ }
+
+ return null;
+ }
+
+ RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
+ cacheIndex++;
+
+ return entry;
+ }
+
+ private void loadCache() {
+ if (cacheIndex > cacheEntries.size()) {
+ throw new IllegalStateException();
+ }
+
+ // Load cache entries only when the cache is empty and there still exist unread entries
+ if (cacheIndex < cacheEntries.size() || expired) {
+ return;
+ }
+
+ RocksIterator iterator = db.newIterator(columnFamily);
+
+ /*
+ * The iteration starts from the prefix bytes at the first loading. The cache then is
+ * reloaded when the next entry to return is the last one in the cache. At that time,
+ * we will start the iterating from the last returned entry.
+ */
+ RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
+ byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
+ int numEntries = (lastEntry == null ? CACHE_SIZE_BASE : Math.min(cacheEntries.size() * 2, CACHE_SIZE_LIMIT));
+
+ cacheEntries.clear();
+ cacheIndex = 0;
+
+ iterator.seek(startBytes);
+
+ /*
+ * If the last returned entry is not deleted, it will be the first entry in the
+ * iterating. Skip it to avoid redundant access in such cases.
+ */
+ if (lastEntry != null && !lastEntry.deleted) {
+ iterator.next();
+ }
+
+ while (true) {
+ if (!iterator.isValid() || !underSameKey(iterator.key())) {
+ expired = true;
+ break;
+ }
+
+ if (cacheEntries.size() >= numEntries) {
+ break;
+ }
+
+ RocksDBMapEntry entry = new RocksDBMapEntry(db, iterator.key(), iterator.value());
+ cacheEntries.add(entry);
+
+ iterator.next();
+ }
+
+ iterator.close();
+ }
+
+ private boolean underSameKey(byte[] rawKeyBytes) {
+ if (rawKeyBytes.length < keyPrefixBytes.length) {
+ return false;
+ }
+
+ for (int i = 0; i < keyPrefixBytes.length; ++i) {
+ if (rawKeyBytes[i] != keyPrefixBytes[i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 405e390..98ad018 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -31,6 +31,8 @@ import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
@@ -387,4 +389,44 @@ public interface RuntimeContext {
*/
@PublicEvolving
<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
+
+ /**
+ * Gets a handle to the system's key/value map state. This state is similar to the state
+ * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
+ * is composed of user-defined key-value pairs
+ *
+ * <p>This state is only accessible if the function is executed on a KeyedStream.
+ *
+ * <pre>{@code
+ * DataStream<MyType> stream = ...;
+ * KeyedStream<MyType> keyedStream = stream.keyBy("id");
+ *
+ * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
+ *
+ * private MapState<MyType, Long> state;
+ *
+ * public void open(Configuration cfg) {
+ * state = getRuntimeContext().getMapState(
+ * new MapStateDescriptor<>("sum", MyType.class, Long.class));
+ * }
+ *
+ * public Tuple2<MyType, Long> map(MyType value) {
+ * return new Tuple2<>(value, state.get(value));
+ * }
+ * });
+ *
+ * }</pre>
+ *
+ * @param stateProperties The descriptor defining the properties of the stats.
+ *
+ * @param <UK> The type of the user keys stored in the state.
+ * @param <UV> The type of the user values stored in the state.
+ *
+ * @return The partitioned state object.
+ *
+ * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
+ * function (function is not part of a KeyedStream).
+ */
+ @PublicEvolving
+ <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 0eafeaa..2538799 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -33,6 +33,8 @@ import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
@@ -214,4 +216,11 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}
+
+ @Override
+ @PublicEvolving
+ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+ throw new UnsupportedOperationException(
+ "This state is only accessible by functions executed on a KeyedStream");
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index bbb4c67..2187f6c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -196,4 +196,44 @@ public interface KeyedStateStore {
*/
@PublicEvolving
<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
+
+ /**
+ * Gets a handle to the system's key/value map state. This state is similar to the state
+ * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
+ * is composed of user-defined key-value pairs
+ *
+ * <p>This state is only accessible if the function is executed on a KeyedStream.
+ *
+ * <pre>{@code
+ * DataStream<MyType> stream = ...;
+ * KeyedStream<MyType> keyedStream = stream.keyBy("id");
+ *
+ * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
+ *
+ * private MapState<MyType, Long> state;
+ *
+ * public void open(Configuration cfg) {
+ * state = getRuntimeContext().getMapState(
+ * new MapStateDescriptor<>("sum", MyType.class, Long.class));
+ * }
+ *
+ * public Tuple2<MyType, Long> map(MyType value) {
+ * return new Tuple2<>(value, state.get(value));
+ * }
+ * });
+ *
+ * }</pre>
+ *
+ * @param stateProperties The descriptor defining the properties of the stats.
+ *
+ * @param <UK> The type of the user keys stored in the state.
+ * @param <UV> The type of the user values stored in the state.
+ *
+ * @return The partitioned state object.
+ *
+ * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
+ * function (function is not part of a KeyedStream).
+ */
+ @PublicEvolving
+ <UK, UV> MapState<UK,UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
new file mode 100644
index 0000000..fa657ef
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link State} interface for partitioned key-value state. The key-value pair can be
+ * added, updated and retrieved.
+ *
+ * <p>The state is accessed and modified by user functions, and checkpointed consistently
+ * by the system as part of the distributed snapshots.
+ *
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * automatically supplied by the system, so the function always sees the value mapped to the
+ * key of the current element. That way, the system can handle stream and state partitioning
+ * consistently together.
+ *
+ * @param <UK> Type of the keys in the state.
+ * @param <UV> Type of the values in the state.
+ */
+@PublicEvolving
+public interface MapState<UK, UV> extends State {
+
+ /**
+ * Returns the current value associated with the given key.
+ *
+ * @param key The key of the mapping
+ * @return The value of the mapping with the given key
+ *
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ UV get(UK key) throws Exception;
+
+ /**
+ * Associates a new value with the given key.
+ *
+ * @param key The key of the mapping
+ * @param value The new value of the mapping
+ *
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ void put(UK key, UV value) throws Exception;
+
+ /**
+ * Copies all of the mappings from the given map into the state.
+ *
+ * @param map The mappings to be stored in this state
+ *
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ void putAll(Map<UK, UV> map) throws Exception;
+
+ /**
+ * Deletes the mapping of the given key.
+ *
+ * @param key The key of the mapping
+ *
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ void remove(UK key) throws Exception;
+
+ /**
+ * Returns whether there exists the given mapping.
+ *
+ * @param key The key of the mapping
+ * @return True if there exists a mapping whose key equals to the given key
+ *
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ boolean contains(UK key) throws Exception;
+
+ /**
+ * @return The number of mappings in the state.
+ *
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ int size() throws Exception;
+
+ /**
+ * Returns all the mappings in the state
+ *
+ * @return An iterable view of all the key-value pairs in the state.
+ *
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ Iterable<Map.Entry<UK, UV>> entries() throws Exception;
+
+ /**
+ * Returns all the keys in the state
+ *
+ * @return An iterable view of all the keys in the state.
+ *
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ Iterable<UK> keys() throws Exception;
+
+ /**
+ * Returns all the values in the state.
+ *
+ * @return An iterable view of all the values in the state.
+ *
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ Iterable<UV> values() throws Exception;
+
+ /**
+ * Iterates over all the mappings in the state.
+ *
+ * @return An iterator over all the mappings in the state
+ *
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
new file mode 100644
index 0000000..d4a49f8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+
+import java.util.Map;
+
+/**
+ * A {@link StateDescriptor} for {@link MapState}. This can be used to create state where the type
+ * is a map that can be updated and iterated over.
+ *
+ * <p>Using {@code MapState} is typically more efficient than manually maintaining a map in a
+ * {@link ValueState}, because the backing implementation can support efficient updates, rather then
+ * replacing the full map on write.
+ *
+ * <p>To create keyed map state (on a KeyedStream), use
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getMapState(MapStateDescriptor)}.
+ *
+ * @param <UK> The type of the keys that can be added to the map state.
+ */
+@PublicEvolving
+public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {
+
+ /**
+ * Create a new {@code MapStateDescriptor} with the given name and the given type serializers.
+ *
+ * @param name The name of the {@code MapStateDescriptor}.
+ * @param keySerializer The type serializer for the keys in the state.
+ * @param valueSerializer The type serializer for the values in the state.
+ */
+ public MapStateDescriptor(String name, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
+ super(name, new MapSerializer<>(keySerializer, valueSerializer), null);
+ }
+
+ /**
+ * Create a new {@code MapStateDescriptor} with the given name and the given type informations.
+ *
+ * @param name The name of the {@code MapStateDescriptor}.
+ * @param keyTypeInfo The type information for the keys in the state.
+ * @param valueTypeInfo The type information for the values in the state.
+ */
+ public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo) {
+ super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null);
+ }
+
+ /**
+ * Create a new {@code MapStateDescriptor} with the given name and the given type information.
+ *
+ * <p>If this constructor fails (because it is not possible to describe the type via a class),
+ * consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} constructor.
+ *
+ * @param name The name of the {@code MapStateDescriptor}.
+ * @param keyClass The class of the type of keys in the state.
+ * @param valueClass The class of the type of values in the state.
+ */
+ public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) {
+ super(name, new MapTypeInfo<>(keyClass, valueClass), null);
+ }
+
+ @Override
+ public MapState<UK, UV> bind(StateBinder stateBinder) throws Exception {
+ return stateBinder.createMapState(this);
+ }
+
+ @Override
+ public Type getType() {
+ return Type.MAP;
+ }
+
+ /**
+ * Gets the serializer for the keys in the state.
+ *
+ * @return The serializer for the keys in the state.
+ */
+ public TypeSerializer<UK> getKeySerializer() {
+ final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer();
+ if (!(rawSerializer instanceof MapSerializer)) {
+ throw new IllegalStateException("Unexpected serializer type.");
+ }
+
+ return ((MapSerializer<UK, UV>) rawSerializer).getKeySerializer();
+ }
+
+ /**
+ * Gets the serializer for the values in the state.
+ *
+ * @return The serializer for the values in the state.
+ */
+ public TypeSerializer<UV> getValueSerializer() {
+ final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer();
+ if (!(rawSerializer instanceof MapSerializer)) {
+ throw new IllegalStateException("Unexpected serializer type.");
+ }
+
+ return ((MapSerializer<UK, UV>) rawSerializer).getValueSerializer();
+ }
+
+ @Override
+ public int hashCode() {
+ int result = serializer.hashCode();
+ result = 31 * result + name.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MapStateDescriptor<?, ?> that = (MapStateDescriptor<?, ?>) o;
+ return serializer.equals(that.serializer) && name.equals(that.name);
+ }
+
+ @Override
+ public String toString() {
+ return "MapStateDescriptor{" +
+ "name=" + name +
+ ", serializer=" + serializer +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
index 08dfc90..9df7a47 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
@@ -70,4 +70,13 @@ public interface StateBinder {
* @param <ACC> Type of the value in the state
*/
<T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
+
+ /**
+ * Creates and returns a new {@link MapState}.
+ * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+ *
+ * @param <MK> Type of the keys in the state
+ * @param <MV> Type of the values in the state
+ */
+ <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 332e649..a52ea32 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -55,7 +55,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
*/
// IMPORTANT: Do not change the order of the elements in this enum, ordinal is used in serialization
public enum Type {
- @Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING, AGGREGATING
+ @Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING, AGGREGATING, MAP
}
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
new file mode 100644
index 0000000..5e1a3bf
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * A serializer for {@link Map}. The serializer relies on a key serializer and a value serializer
+ * for the serialization of the map's key-value pairs.
+ *
+ * <p>The serialization format for the map is as follows: four bytes for the length of the map,
+ * followed by the serialized representation of each key-value pair. To allow null values, each value
+ * is prefixed by a null marker.
+ *
+ * @param <K> The type of the keys in the map.
+ * @param <V> The type of the values in the map.
+ */
+public class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
+
+ private static final long serialVersionUID = -6885593032367050078L;
+
+ /** The serializer for the keys in the map */
+ private final TypeSerializer<K> keySerializer;
+
+ /** The serializer for the values in the map */
+ private final TypeSerializer<V> valueSerializer;
+
+ /**
+ * Creates a map serializer that uses the given serializers to serialize the key-value pairs in the map.
+ *
+ * @param keySerializer The serializer for the keys in the map
+ * @param valueSerializer The serializer for the values in the map
+ */
+ public MapSerializer(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
+ this.keySerializer = Preconditions.checkNotNull(keySerializer, "The key serializer cannot be null");
+ this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "The value serializer cannot be null.");
+ }
+
+ // ------------------------------------------------------------------------
+ // MapSerializer specific properties
+ // ------------------------------------------------------------------------
+
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ public TypeSerializer<V> getValueSerializer() {
+ return valueSerializer;
+ }
+
+ // ------------------------------------------------------------------------
+ // Type Serializer implementation
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<Map<K, V>> duplicate() {
+ TypeSerializer<K> duplicateKeySerializer = keySerializer.duplicate();
+ TypeSerializer<V> duplicateValueSerializer = valueSerializer.duplicate();
+
+ return new MapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
+ }
+
+ @Override
+ public Map<K, V> createInstance() {
+ return new HashMap<>();
+ }
+
+ @Override
+ public Map<K, V> copy(Map<K, V> from) {
+ Map<K, V> newMap = new HashMap<>(from.size());
+
+ for (Map.Entry<K, V> entry : from.entrySet()) {
+ K newKey = keySerializer.copy(entry.getKey());
+ V newValue = entry.getValue() == null ? null : valueSerializer.copy(entry.getValue());
+
+ newMap.put(newKey, newValue);
+ }
+
+ return newMap;
+ }
+
+ @Override
+ public Map<K, V> copy(Map<K, V> from, Map<K, V> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1; // var length
+ }
+
+ @Override
+ public void serialize(Map<K, V> map, DataOutputView target) throws IOException {
+ final int size = map.size();
+ target.writeInt(size);
+
+ for (Map.Entry<K, V> entry : map.entrySet()) {
+ keySerializer.serialize(entry.getKey(), target);
+
+ if (entry.getValue() == null) {
+ target.writeBoolean(true);
+ } else {
+ target.writeBoolean(false);
+ valueSerializer.serialize(entry.getValue(), target);
+ }
+ }
+ }
+
+ @Override
+ public Map<K, V> deserialize(DataInputView source) throws IOException {
+ final int size = source.readInt();
+
+ final Map<K, V> map = new HashMap<>(size);
+ for (int i = 0; i < size; ++i) {
+ K key = keySerializer.deserialize(source);
+
+ boolean isNull = source.readBoolean();
+ V value = isNull ? null : valueSerializer.deserialize(source);
+
+ map.put(key, value);
+ }
+
+ return map;
+ }
+
+ @Override
+ public Map<K, V> deserialize(Map<K, V> reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ final int size = source.readInt();
+ target.writeInt(size);
+
+ for (int i = 0; i < size; ++i) {
+ keySerializer.copy(source, target);
+
+ boolean isNull = source.readBoolean();
+ target.writeBoolean(isNull);
+
+ if (!isNull) {
+ valueSerializer.copy(source, target);
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this ||
+ (obj != null && obj.getClass() == getClass() &&
+ keySerializer.equals(((MapSerializer<?, ?>) obj).getKeySerializer()) &&
+ valueSerializer.equals(((MapSerializer<?, ?>) obj).getValueSerializer()));
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return (obj != null && obj.getClass() == getClass());
+ }
+
+ @Override
+ public int hashCode() {
+ return keySerializer.hashCode() * 31 + valueSerializer.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MapTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MapTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MapTypeInfo.java
new file mode 100644
index 0000000..ca04e0c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MapTypeInfo.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Special {@code TypeInformation} used by {@link org.apache.flink.api.common.state.MapStateDescriptor}.
+ *
+ * @param <K> The type of the keys in the map.
+ * @param <V> The type of the values in the map.
+ */
+@PublicEvolving
+public class MapTypeInfo<K, V> extends TypeInformation<Map<K, V>> {
+
+ /* The type information for the keys in the map*/
+ private final TypeInformation<K> keyTypeInfo;
+
+ /* The type information for the values in the map */
+ private final TypeInformation<V> valueTypeInfo;
+
+ public MapTypeInfo(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo) {
+ this.keyTypeInfo = Preconditions.checkNotNull(keyTypeInfo, "The key type information cannot be null.");
+ this.valueTypeInfo = Preconditions.checkNotNull(valueTypeInfo, "The value type information cannot be null.");
+ }
+
+ public MapTypeInfo(Class<K> keyClass, Class<V> valueClass) {
+ this.keyTypeInfo = of(checkNotNull(keyClass, "The key class cannot be null."));
+ this.valueTypeInfo = of(checkNotNull(valueClass, "The value class cannot be null."));
+ }
+
+ // ------------------------------------------------------------------------
+ // MapTypeInfo specific properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the type information for the keys in the map
+ */
+ public TypeInformation<K> getKeyTypeInfo() {
+ return keyTypeInfo;
+ }
+
+ /**
+ * Gets the type information for the values in the map
+ */
+ public TypeInformation<V> getValueTypeInfo() {
+ return valueTypeInfo;
+ }
+
+ // ------------------------------------------------------------------------
+ // TypeInformation implementation
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 0;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 2;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<Map<K, V>> getTypeClass() {
+ return (Class<Map<K, V>>)(Class<?>)Map.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<Map<K, V>> createSerializer(ExecutionConfig config) {
+ TypeSerializer<K> keyTypeSerializer = keyTypeInfo.createSerializer(config);
+ TypeSerializer<V> valueTypeSerializer = valueTypeInfo.createSerializer(config);
+
+ return new MapSerializer<>(keyTypeSerializer, valueTypeSerializer);
+ }
+
+ @Override
+ public String toString() {
+ return "Map<" + keyTypeInfo + ", " + valueTypeInfo + ">";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (obj instanceof MapTypeInfo) {
+ @SuppressWarnings("unchecked")
+ MapTypeInfo<K, V> other = (MapTypeInfo<K, V>) obj;
+
+ return (other.canEqual(this) &&
+ keyTypeInfo.equals(other.keyTypeInfo) && valueTypeInfo.equals(other.valueTypeInfo));
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * keyTypeInfo.hashCode() + valueTypeInfo.hashCode();
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return (obj != null && obj.getClass() == getClass());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
new file mode 100644
index 0000000..9d1b105
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class MapStateDescriptorTest {
+
+ @Test
+ public void testMapStateDescriptorEagerSerializer() throws Exception {
+
+ TypeSerializer<Integer> keySerializer = new KryoSerializer<>(Integer.class, new ExecutionConfig());
+ TypeSerializer<String> valueSerializer = new KryoSerializer<>(String.class, new ExecutionConfig());
+
+ MapStateDescriptor<Integer, String> descr =
+ new MapStateDescriptor<>("testName", keySerializer, valueSerializer);
+
+ assertEquals("testName", descr.getName());
+ assertNotNull(descr.getSerializer());
+ assertTrue(descr.getSerializer() instanceof MapSerializer);
+ assertNotNull(descr.getKeySerializer());
+ assertEquals(keySerializer, descr.getKeySerializer());
+ assertNotNull(descr.getValueSerializer());
+ assertEquals(valueSerializer, descr.getValueSerializer());
+
+ MapStateDescriptor<Integer, String> copy = CommonTestUtils.createCopySerializable(descr);
+
+ assertEquals("testName", copy.getName());
+ assertNotNull(copy.getSerializer());
+ assertTrue(copy.getSerializer() instanceof MapSerializer);
+
+ assertNotNull(copy.getKeySerializer());
+ assertEquals(keySerializer, copy.getKeySerializer());
+ assertNotNull(copy.getValueSerializer());
+ assertEquals(valueSerializer, copy.getValueSerializer());
+ }
+
+ @Test
+ public void testMapStateDescriptorLazySerializer() throws Exception {
+ // some different registered value
+ ExecutionConfig cfg = new ExecutionConfig();
+ cfg.registerKryoType(TaskInfo.class);
+
+ MapStateDescriptor<Path, String> descr =
+ new MapStateDescriptor<>("testName", Path.class, String.class);
+
+ try {
+ descr.getSerializer();
+ fail("should cause an exception");
+ } catch (IllegalStateException ignored) {}
+
+ descr.initializeSerializerUnlessSet(cfg);
+
+ assertNotNull(descr.getSerializer());
+ assertTrue(descr.getSerializer() instanceof MapSerializer);
+
+ assertNotNull(descr.getKeySerializer());
+ assertTrue(descr.getKeySerializer() instanceof KryoSerializer);
+
+ assertTrue(((KryoSerializer<?>) descr.getKeySerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
+
+ assertNotNull(descr.getValueSerializer());
+ assertTrue(descr.getValueSerializer() instanceof StringSerializer);
+ }
+
+ @Test
+ public void testMapStateDescriptorAutoSerializer() throws Exception {
+
+ MapStateDescriptor<String, Long> descr =
+ new MapStateDescriptor<>("testName", String.class, Long.class);
+
+ MapStateDescriptor<String, Long> copy = CommonTestUtils.createCopySerializable(descr);
+
+ assertEquals("testName", copy.getName());
+
+ assertNotNull(copy.getSerializer());
+ assertTrue(copy.getSerializer() instanceof MapSerializer);
+
+ assertNotNull(copy.getKeySerializer());
+ assertEquals(StringSerializer.INSTANCE, copy.getKeySerializer());
+ assertNotNull(copy.getValueSerializer());
+ assertEquals(LongSerializer.INSTANCE, copy.getValueSerializer());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerTest.java
new file mode 100644
index 0000000..9ce7de1
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+/**
+ * A test for the {@link MapSerializer}.
+ */
+public class MapSerializerTest extends SerializerTestBase<Map<Long, String>> {
+
+ @Override
+ protected TypeSerializer<Map<Long, String>> createSerializer() {
+ return new MapSerializer<>(LongSerializer.INSTANCE, StringSerializer.INSTANCE);
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Class<Map<Long, String>> getTypeClass() {
+ return (Class<Map<Long, String>>) (Class<?>) Map.class;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ protected Map<Long, String>[] getTestData() {
+ final Random rnd = new Random(123654789);
+
+ // empty maps
+ final Map<Long, String> map1 = Collections.emptyMap();
+ final Map<Long, String> map2 = new HashMap<>();
+ final Map<Long, String> map3 = new TreeMap<>();
+
+ // single element maps
+ final Map<Long, String> map4 = Collections.singletonMap(0L, "hello");
+ final Map<Long, String> map5 = new HashMap<>();
+ map5.put(12345L, "12345L");
+ final Map<Long, String> map6 = new TreeMap<>();
+ map6.put(777888L, "777888L");
+
+ // longer maps
+ final Map<Long, String> map7 = new HashMap<>();
+ for (int i = 0; i < rnd.nextInt(200); i++) {
+ map7.put(rnd.nextLong(), Long.toString(rnd.nextLong()));
+ }
+
+ final Map<Long, String> map8 = new TreeMap<>();
+ for (int i = 0; i < rnd.nextInt(200); i++) {
+ map8.put(rnd.nextLong(), Long.toString(rnd.nextLong()));
+ }
+
+ // null-value maps
+ final Map<Long, String> map9 = Collections.singletonMap(0L, null);
+ final Map<Long, String> map10 = new HashMap<>();
+ map10.put(999L, null);
+ final Map<Long, String> map11 = new TreeMap<>();
+ map11.put(666L, null);
+
+ return (Map<Long, String>[]) new Map[] {
+ map1, map2, map3, map4, map5, map6, map7, map8, map9, map10, map11
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index 109d152..7f8eea8 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -118,6 +118,10 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
@Override
@Test
public void testReducingStateRestoreWithWrongSerializers() {}
+
+ @Override
+ @Test
+ public void testMapStateRestoreWithWrongSerializers() {}
@Test
public void testStateOutputStream() {
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index 2f32861..bc830e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -36,7 +36,9 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Serialization and deserialization of messages exchanged between
@@ -484,6 +486,71 @@ public final class KvStateRequestSerializer {
return null;
}
}
+
+ /**
+ * Serializes all values of the Iterable with the given serializer.
+ *
+ * @param entries Key-value pairs to serialize
+ * @param keySerializer Serializer for UK
+ * @param valueSerializer Serializer for UV
+ * @param <UK> Type of the keys
+ * @param <UV> Type of the values
+ * @return Serialized values or <code>null</code> if values <code>null</code> or empty
+ * @throws IOException On failure during serialization
+ */
+ public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> entries, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException {
+ if (entries != null) {
+ // Serialize
+ DataOutputSerializer dos = new DataOutputSerializer(32);
+
+ for (Map.Entry<UK, UV> entry : entries) {
+ keySerializer.serialize(entry.getKey(), dos);
+
+ if (entry.getValue() == null) {
+ dos.writeBoolean(true);
+ } else {
+ dos.writeBoolean(false);
+ valueSerializer.serialize(entry.getValue(), dos);
+ }
+ }
+
+ return dos.getCopyOfBuffer();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Deserializes all kv pairs with the given serializer.
+ *
+ * @param serializedValue Serialized value of type Map<UK, UV>
+ * @param keySerializer Serializer for UK
+ * @param valueSerializer Serializer for UV
+ * @param <UK> Type of the key
+ * @param <UV> Type of the value.
+ * @return Deserialized map or <code>null</code> if the serialized value
+ * is <code>null</code>
+ * @throws IOException On failure during deserialization
+ */
+ public static <UK, UV> Map<UK, UV> deserializeMap(byte[] serializedValue, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException {
+ if (serializedValue != null) {
+ DataInputDeserializer in = new DataInputDeserializer(serializedValue, 0, serializedValue.length);
+
+ Map<UK, UV> result = new HashMap<>();
+ while (in.available() > 0) {
+ UK key = keySerializer.deserialize(in);
+
+ boolean isNull = in.readBoolean();
+ UV value = isNull ? null : valueSerializer.deserialize(in);
+
+ result.put(key, value);
+ }
+
+ return result;
+ } else {
+ return null;
+ }
+ }
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index fe5d1cc..3ed49f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
@@ -39,6 +41,7 @@ import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.Preconditions;
@@ -189,6 +192,20 @@ public abstract class AbstractKeyedStateBackend<K>
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
/**
+ * Creates and returns a new {@link MapState}.
+ *
+ * @param namespaceSerializer TypeSerializer for the state namespace.
+ * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+ *
+ * @param <N> The type of the namespace.
+ * @param <UK> Type of the keys in the state
+ * @param <UV> Type of the values in the state *
+ */
+ protected abstract <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
+ TypeSerializer<N> namespaceSerializer,
+ MapStateDescriptor<UK, UV> stateDesc) throws Exception;
+
+ /**
* @see KeyedStateBackend
*/
@Override
@@ -285,12 +302,16 @@ public abstract class AbstractKeyedStateBackend<K>
AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
return AbstractKeyedStateBackend.this.createAggregatingState(namespaceSerializer, stateDesc);
}
-
@Override
public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
}
+
+ @Override
+ public <UK, UV> MapState<UK, UV> createMapState(MapStateDescriptor<UK, UV> stateDesc) throws Exception {
+ return AbstractKeyedStateBackend.this.createMapState(namespaceSerializer, stateDesc);
+ }
});
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
index d8b8aa8..a32cebd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
@@ -93,6 +95,18 @@ public class DefaultKeyedStateStore implements KeyedStateStore {
}
}
+ @Override
+ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+ requireNonNull(stateProperties, "The state properties must not be null");
+ try {
+ stateProperties.initializeSerializerUnlessSet(executionConfig);
+ MapState<UK, UV> originalState = getPartitionedState(stateProperties);
+ return new UserFacingMapState<>(originalState);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while getting state", e);
+ }
+ }
+
private <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
return keyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
new file mode 100644
index 0000000..61cc58c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A serializer for {@link HashMap}. The serializer relies on a key serializer and a value serializer
+ * for the serialization of the map's key-value pairs.
+ *
+ * <p>The serialization format for the map is as follows: four bytes for the length of the map,
+ * followed by the serialized representation of each key-value pair. To allow null values, each value
+ * is prefixed by a null marker.
+ *
+ * @param <K> The type of the keys in the map.
+ * @param <V> The type of the values in the map.
+ */
+public class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> {
+
+ private static final long serialVersionUID = -6885593032367050078L;
+
+ /** The serializer for the keys in the map */
+ private final TypeSerializer<K> keySerializer;
+
+ /** The serializer for the values in the map */
+ private final TypeSerializer<V> valueSerializer;
+
+ /**
+ * Creates a map serializer that uses the given serializers to serialize the key-value pairs in the map.
+ *
+ * @param keySerializer The serializer for the keys in the map
+ * @param valueSerializer The serializer for the values in the map
+ */
+ public HashMapSerializer(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
+ this.keySerializer = Preconditions.checkNotNull(keySerializer, "The key serializer cannot be null");
+ this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "The value serializer cannot be null.");
+ }
+
+ // ------------------------------------------------------------------------
+ // HashMapSerializer specific properties
+ // ------------------------------------------------------------------------
+
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ public TypeSerializer<V> getValueSerializer() {
+ return valueSerializer;
+ }
+
+ // ------------------------------------------------------------------------
+ // Type Serializer implementation
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<HashMap<K, V>> duplicate() {
+ TypeSerializer<K> duplicateKeySerializer = keySerializer.duplicate();
+ TypeSerializer<V> duplicateValueSerializer = valueSerializer.duplicate();
+
+ return new HashMapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
+ }
+
+ @Override
+ public HashMap<K, V> createInstance() {
+ return new HashMap<>();
+ }
+
+ @Override
+ public HashMap<K, V> copy(HashMap<K, V> from) {
+ HashMap<K, V> newHashMap = new HashMap<>(from.size());
+
+ for (Map.Entry<K, V> entry : from.entrySet()) {
+ K newKey = keySerializer.copy(entry.getKey());
+ V newValue = entry.getValue() == null ? null : valueSerializer.copy(entry.getValue());
+
+ newHashMap.put(newKey, newValue);
+ }
+
+ return newHashMap;
+ }
+
+ @Override
+ public HashMap<K, V> copy(HashMap<K, V> from, HashMap<K, V> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1; // var length
+ }
+
+ @Override
+ public void serialize(HashMap<K, V> map, DataOutputView target) throws IOException {
+ final int size = map.size();
+ target.writeInt(size);
+
+ for (Map.Entry<K, V> entry : map.entrySet()) {
+ keySerializer.serialize(entry.getKey(), target);
+
+ if (entry.getValue() == null) {
+ target.writeBoolean(true);
+ } else {
+ target.writeBoolean(false);
+ valueSerializer.serialize(entry.getValue(), target);
+ }
+ }
+ }
+
+ @Override
+ public HashMap<K, V> deserialize(DataInputView source) throws IOException {
+ final int size = source.readInt();
+
+ final HashMap<K, V> map = new HashMap<>(size);
+ for (int i = 0; i < size; ++i) {
+ K key = keySerializer.deserialize(source);
+
+ boolean isNull = source.readBoolean();
+ V value = isNull ? null : valueSerializer.deserialize(source);
+
+ map.put(key, value);
+ }
+
+ return map;
+ }
+
+ @Override
+ public HashMap<K, V> deserialize(HashMap<K, V> reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ final int size = source.readInt();
+ target.writeInt(size);
+
+ for (int i = 0; i < size; ++i) {
+ keySerializer.copy(source, target);
+
+ boolean isNull = source.readBoolean();
+ target.writeBoolean(isNull);
+
+ if (!isNull) {
+ valueSerializer.copy(source, target);
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this ||
+ (obj != null && obj.getClass() == getClass() &&
+ keySerializer.equals(((HashMapSerializer<?, ?>) obj).getKeySerializer()) &&
+ valueSerializer.equals(((HashMapSerializer<?, ?>) obj).getValueSerializer()));
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return (obj != null && obj.getClass() == getClass());
+ }
+
+ @Override
+ public int hashCode() {
+ return keySerializer.hashCode() * 31 + valueSerializer.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
new file mode 100644
index 0000000..6cddf6d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.MapState;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Simple wrapper map state that exposes empty state properly as an empty map.
+ *
+ * @param <K> The type of keys in the map state.
+ * @param <V> The type of values in the map state.
+ */
+class UserFacingMapState<K, V> implements MapState<K, V> {
+
+ private final MapState<K, V> originalState;
+
+ private final Map<K, V> emptyState = Collections.<K, V>emptyMap();
+
+ UserFacingMapState(MapState<K, V> originalState) {
+ this.originalState = originalState;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public V get(K key) throws Exception {
+ return originalState.get(key);
+ }
+
+ @Override
+ public void put(K key, V value) throws Exception {
+ originalState.put(key, value);
+ }
+
+ @Override
+ public void putAll(Map<K, V> value) throws Exception {
+ originalState.putAll(value);
+ }
+
+ @Override
+ public void clear() {
+ originalState.clear();
+ }
+
+ @Override
+ public void remove(K key) throws Exception {
+ originalState.remove(key);
+ }
+
+ @Override
+ public boolean contains(K key) throws Exception {
+ return originalState.contains(key);
+ }
+
+ @Override
+ public int size() throws Exception {
+ return originalState.size();
+ }
+
+ @Override
+ public Iterable<Map.Entry<K, V>> entries() throws Exception {
+ Iterable<Map.Entry<K, V>> original = originalState.entries();
+ return original != null ? original : emptyState.entrySet();
+ }
+
+ @Override
+ public Iterable<K> keys() throws Exception {
+ Iterable<K> original = originalState.keys();
+ return original != null ? original : emptyState.keySet();
+ }
+
+ @Override
+ public Iterable<V> values() throws Exception {
+ Iterable<V> original = originalState.values();
+ return original != null ? original : emptyState.values();
+ }
+
+ @Override
+ public Iterator<Map.Entry<K, V>> iterator() throws Exception {
+ Iterator<Map.Entry<K, V>> original = originalState.iterator();
+ return original != null ? original : emptyState.entrySet().iterator();
+ }
+}