You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/16 06:20:51 UTC

[flink] Diff for: [GitHub] klion26 closed pull request #6558: [FLINK-9116] Introduce getAll and removeAll for MapState

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
index 7a130d49083..5c28edca0de 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -51,6 +52,16 @@
 	 */
 	UV get(UK key) throws Exception;
 
+	/**
+	 * Returns the current values associated for the given keys.
+	 *
+	 * @param keys The keys of the mappings
+	 * @return The key-value map of the mapping with the given keys
+	 *
+	 * @throws Exception Throw if the system cannot access the state.
+	 */
+	Map<UK, UV> getAll(Collection<UK> keys) throws Exception;
+
 	/**
 	 * Associates a new value with the given key.
 	 *
@@ -79,6 +90,15 @@
 	 */
 	void remove(UK key) throws Exception;
 
+	/**
+	 * Deletes the mapping of the given keys.
+	 *
+	 * @param keys The keys of the mapping
+	 *
+	 * @throws Exception Throw if the system cannot access the state.
+	 */
+	void removeAll(Collection<UK> keys) throws Exception;
+
 	/**
 	 * Returns whether there exists the given mapping.
 	 *
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
index 4d510cf405b..ede7f986d11 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
@@ -37,6 +37,7 @@
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -151,6 +152,16 @@ public UV get(UK key) throws Exception {
 					return values.get(key);
 				}
 
+				@Override
+				public Map<UK, UV> getAll(Collection<UK> keys) throws Exception {
+					Map<UK, UV> maps = new HashMap<>(keys.size());
+
+					for (UK key : keys) {
+						maps.put(key, get(key));
+					}
+					return maps;
+				}
+
 				@Override
 				public void put(UK key, UV value) throws Exception {
 					stateWrites++;
@@ -173,6 +184,13 @@ public void remove(UK key) throws Exception {
 					values.remove(key);
 				}
 
+				@Override
+				public void removeAll(Collection<UK> keys) throws Exception {
+					for (UK key : keys) {
+						remove(key);
+					}
+				}
+
 				@Override
 				public boolean contains(UK key) throws Exception {
 					if (values == null) {
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
index 4d51b7de6be..123919c84be 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -52,6 +53,16 @@ public V get(K key) {
 		return state.get(key);
 	}
 
+	@Override
+	public Map<K, V> getAll(Collection<K> keys) {
+
+		Map<K, V> maps = new HashMap<>(keys.size());
+		for (K key : keys) {
+			maps.put(key, state.get(key));
+		}
+		return maps;
+	}
+
 	@Override
 	public void put(K key, V value) {
 		throw MODIFICATION_ATTEMPT_ERROR;
@@ -67,6 +78,11 @@ public void remove(K key) {
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
 
+	@Override
+	public void removeAll(Collection<K> keys) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
 	@Override
 	public boolean contains(K key) {
 		return state.containsKey(key);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
index ce4d032c882..f82cd3395a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.state.MapState;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
@@ -47,6 +48,11 @@ public V get(K key) throws Exception {
 		return originalState.get(key);
 	}
 
+	@Override
+	public Map<K, V> getAll(Collection<K> keys) throws Exception {
+		return originalState.getAll(keys);
+	}
+
 	@Override
 	public void put(K key, V value) throws Exception {
 		originalState.put(key, value);
@@ -67,6 +73,11 @@ public void remove(K key) throws Exception {
 		originalState.remove(key);
 	}
 
+	@Override
+	public void removeAll(Collection<K> keys) throws Exception {
+		originalState.removeAll(keys);
+	}
+
 	@Override
 	public boolean contains(K key) throws Exception {
 		return originalState.contains(key);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 745e7f4f58e..e3b9a4d4ccb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -28,6 +28,8 @@
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -82,7 +84,7 @@ private HeapMapState(
 	@Override
 	public UV get(UK userKey) {
 
-		Map<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = getUserMap();
 
 		if (userMap == null) {
 			return null;
@@ -91,10 +93,26 @@ public UV get(UK userKey) {
 		return userMap.get(userKey);
 	}
 
+	@Override
+	public Map<UK, UV> getAll(Collection<UK> keys) {
+
+		Map<UK, UV> userMap = getUserMap();
+
+		if (userMap == null) {
+			return Collections.emptyMap();
+		}
+
+		Map<UK, UV> maps = new HashMap<>(keys.size());
+		for (UK key : keys) {
+			maps.put(key, userMap.get(key));
+		}
+		return maps;
+	}
+
 	@Override
 	public void put(UK userKey, UV userValue) {
 
-		Map<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = getUserMap();
 		if (userMap == null) {
 			userMap = new HashMap<>();
 			stateTable.put(currentNamespace, userMap);
@@ -106,7 +124,7 @@ public void put(UK userKey, UV userValue) {
 	@Override
 	public void putAll(Map<UK, UV> value) {
 
-		Map<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = getUserMap();
 
 		if (userMap == null) {
 			userMap = new HashMap<>();
@@ -119,7 +137,7 @@ public void putAll(Map<UK, UV> value) {
 	@Override
 	public void remove(UK userKey) {
 
-		Map<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = getUserMap();
 		if (userMap == null) {
 			return;
 		}
@@ -131,33 +149,50 @@ public void remove(UK userKey) {
 		}
 	}
 
+	@Override
+	public void removeAll(Collection<UK> keys) {
+
+		Map<UK, UV> userMap = getUserMap();
+
+		if (userMap == null) {
+			return;
+		}
+		for (UK key : keys) {
+			userMap.remove(key);
+		}
+
+		if (userMap.isEmpty()) {
+			clear();
+		}
+	}
+
 	@Override
 	public boolean contains(UK userKey) {
-		Map<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = getUserMap();
 		return userMap != null && userMap.containsKey(userKey);
 	}
 
 	@Override
 	public Iterable<Map.Entry<UK, UV>> entries() {
-		Map<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = getUserMap();
 		return userMap == null ? null : userMap.entrySet();
 	}
 
 	@Override
 	public Iterable<UK> keys() {
-		Map<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = getUserMap();
 		return userMap == null ? null : userMap.keySet();
 	}
 
 	@Override
 	public Iterable<UV> values() {
-		Map<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = getUserMap();
 		return userMap == null ? null : userMap.values();
 	}
 
 	@Override
 	public Iterator<Map.Entry<UK, UV>> iterator() {
-		Map<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = getUserMap();
 		return userMap == null ? null : userMap.entrySet().iterator();
 	}
 
@@ -202,4 +237,8 @@ public boolean contains(UK userKey) {
 			stateTable.getNamespaceSerializer(),
 			(Map<UK, UV>) stateDesc.getDefaultValue());
 	}
+
+	private Map<UK, UV> getUserMap() {
+		return stateTable.get(currentNamespace);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index f6f81ffc940..283e2287570 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -26,6 +26,7 @@
 import javax.annotation.Nonnull;
 
 import java.util.AbstractMap;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -57,6 +58,23 @@ public UV get(UK key) throws Exception {
 		return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
 	}
 
+	@Override
+	public Map<UK, UV> getAll(Collection<UK> keys) throws Exception {
+		Map<UK, TtlValue<UV>> ttlValueMap = original.getAll(keys);
+		if (ttlValueMap.isEmpty()) {
+			return Collections.emptyMap();
+		}
+
+		Map<UK, UV> maps = new HashMap<>();
+		for (UK key : keys) {
+			TtlValue<UV> ttlValue = ttlValueMap.get(key);
+			UV value = getWithTtlCheckAndUpdate(() -> ttlValue, v -> original.put(key, ttlValue), () -> original.remove(key));
+			maps.put(key, value);
+		}
+
+		return maps;
+	}
+
 	@Override
 	public void put(UK key, UV value) throws Exception {
 		original.put(key, wrapWithTs(value));
@@ -79,6 +97,11 @@ public void remove(UK key) throws Exception {
 		original.remove(key);
 	}
 
+	@Override
+	public void removeAll(Collection<UK> keys) throws Exception {
+		original.removeAll(keys);
+	}
+
 	@Override
 	public boolean contains(UK key) throws Exception {
 		return get(key) != null;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 059a706c6a8..ee9d21d56fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -2872,6 +2872,22 @@ public void testMapState() throws Exception {
 			assertTrue(entry.getValue().endsWith(updateSuffix));
 		}
 
+		// getAll
+		backend.setCurrentKey("3");
+		Map<Integer, String> kv = new HashMap<>();
+		for (int i = 0; i < 10; ++i) {
+			kv.put(i, String.valueOf(i));
+		}
+		state.putAll(kv);
+
+		Map<Integer, String> actural = state.getAll(kv.keySet());
+		assertEquals(kv, actural);
+
+		state.removeAll(kv.keySet());
+		for (Integer key : kv.keySet()) {
+			assertNull(state.get(key));
+		}
+
 		backend.dispose();
 		// restore the first snapshot and validate it
 		backend = restoreKeyedBackend(StringSerializer.INSTANCE, snapshot1);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
index 9b5ac10c92d..5cd37134663 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
@@ -23,6 +23,7 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -46,6 +47,18 @@ public UV get(UK key) {
 		return getInternal().get(key);
 	}
 
+	@Override
+	public Map<UK, UV> getAll(Collection<UK> keys) {
+
+		Map<UK, UV> userMaps = getInternal();
+
+		Map<UK, UV> maps = new HashMap<>(keys.size());
+		for (UK key : keys) {
+			maps.put(key, userMaps.get(key));
+		}
+		return maps;
+	}
+
 	@Override
 	public void put(UK key, UV value) {
 		this.getInternal().put(key, value);
@@ -61,6 +74,15 @@ public void remove(UK key) {
 		getInternal().remove(key);
 	}
 
+	@Override
+	public void removeAll(Collection<UK> keys) {
+
+		Map<UK, UV> internalMaps = getInternal();
+		for (UK key : keys) {
+			internalMaps.remove(key);
+		}
+	}
+
 	@Override
 	public boolean contains(UK key) {
 		return getInternal().containsKey(key);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index ad6b7c22ec4..e881cad991c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -48,6 +48,8 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -125,6 +127,16 @@ public UV get(UK userKey) throws IOException, RocksDBException {
 		return (rawValueBytes == null ? null : deserializeUserValue(rawValueBytes));
 	}
 
+	@Override
+	public Map<UK, UV> getAll(Collection<UK> keys) throws IOException, RocksDBException {
+
+		Map<UK, UV> maps = new HashMap<>(keys.size());
+		for (UK key : keys) {
+			maps.put(key, get(key));
+		}
+		return maps;
+	}
+
 	@Override
 	public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
 
@@ -156,6 +168,13 @@ public void remove(UK userKey) throws IOException, RocksDBException {
 		backend.db.delete(columnFamily, writeOptions, rawKeyBytes);
 	}
 
+	@Override
+	public void removeAll(Collection<UK> keys) throws Exception {
+		for (UK key : keys) {
+			remove(key);
+		}
+	}
+
 	@Override
 	public boolean contains(UK userKey) throws IOException, RocksDBException {
 		byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);


With regards,
Apache Git Services