You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/16 06:20:51 UTC
[flink] Diff for: [GitHub] klion26 closed pull request #6558: [FLINK-9116]
Introduce getAll and removeAll for MapState
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
index 7a130d49083..5c28edca0de 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
@@ -20,6 +20,7 @@
import org.apache.flink.annotation.PublicEvolving;
+import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
@@ -51,6 +52,16 @@
*/
UV get(UK key) throws Exception;
+ /**
+ * Returns the current values associated for the given keys.
+ *
+ * @param keys The keys of the mappings
+ * @return The key-value map of the mapping with the given keys
+ *
+ * @throws Exception Throw if the system cannot access the state.
+ */
+ Map<UK, UV> getAll(Collection<UK> keys) throws Exception;
+
/**
* Associates a new value with the given key.
*
@@ -79,6 +90,15 @@
*/
void remove(UK key) throws Exception;
+ /**
+ * Deletes the mapping of the given keys.
+ *
+ * @param keys The keys of the mapping
+ *
+ * @throws Exception Throw if the system cannot access the state.
+ */
+ void removeAll(Collection<UK> keys) throws Exception;
+
/**
* Returns whether there exists the given mapping.
*
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
index 4d510cf405b..ede7f986d11 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
@@ -37,6 +37,7 @@
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -151,6 +152,16 @@ public UV get(UK key) throws Exception {
return values.get(key);
}
+ @Override
+ public Map<UK, UV> getAll(Collection<UK> keys) throws Exception {
+ Map<UK, UV> maps = new HashMap<>(keys.size());
+
+ for (UK key : keys) {
+ maps.put(key, get(key));
+ }
+ return maps;
+ }
+
@Override
public void put(UK key, UV value) throws Exception {
stateWrites++;
@@ -173,6 +184,13 @@ public void remove(UK key) throws Exception {
values.remove(key);
}
+ @Override
+ public void removeAll(Collection<UK> keys) throws Exception {
+ for (UK key : keys) {
+ remove(key);
+ }
+ }
+
@Override
public boolean contains(UK key) throws Exception {
if (values == null) {
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
index 4d51b7de6be..123919c84be 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -52,6 +53,16 @@ public V get(K key) {
return state.get(key);
}
+ @Override
+ public Map<K, V> getAll(Collection<K> keys) {
+
+ Map<K, V> maps = new HashMap<>(keys.size());
+ for (K key : keys) {
+ maps.put(key, state.get(key));
+ }
+ return maps;
+ }
+
@Override
public void put(K key, V value) {
throw MODIFICATION_ATTEMPT_ERROR;
@@ -67,6 +78,11 @@ public void remove(K key) {
throw MODIFICATION_ATTEMPT_ERROR;
}
+ @Override
+ public void removeAll(Collection<K> keys) {
+ throw MODIFICATION_ATTEMPT_ERROR;
+ }
+
@Override
public boolean contains(K key) {
return state.containsKey(key);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
index ce4d032c882..f82cd3395a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
@@ -20,6 +20,7 @@
import org.apache.flink.api.common.state.MapState;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
@@ -47,6 +48,11 @@ public V get(K key) throws Exception {
return originalState.get(key);
}
+ @Override
+ public Map<K, V> getAll(Collection<K> keys) throws Exception {
+ return originalState.getAll(keys);
+ }
+
@Override
public void put(K key, V value) throws Exception {
originalState.put(key, value);
@@ -67,6 +73,11 @@ public void remove(K key) throws Exception {
originalState.remove(key);
}
+ @Override
+ public void removeAll(Collection<K> keys) throws Exception {
+ originalState.removeAll(keys);
+ }
+
@Override
public boolean contains(K key) throws Exception {
return originalState.contains(key);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 745e7f4f58e..e3b9a4d4ccb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -28,6 +28,8 @@
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.Preconditions;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -82,7 +84,7 @@ private HeapMapState(
@Override
public UV get(UK userKey) {
- Map<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = getUserMap();
if (userMap == null) {
return null;
@@ -91,10 +93,26 @@ public UV get(UK userKey) {
return userMap.get(userKey);
}
+ @Override
+ public Map<UK, UV> getAll(Collection<UK> keys) {
+
+ Map<UK, UV> userMap = getUserMap();
+
+ if (userMap == null) {
+ return Collections.emptyMap();
+ }
+
+ Map<UK, UV> maps = new HashMap<>(keys.size());
+ for (UK key : keys) {
+ maps.put(key, userMap.get(key));
+ }
+ return maps;
+ }
+
@Override
public void put(UK userKey, UV userValue) {
- Map<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = getUserMap();
if (userMap == null) {
userMap = new HashMap<>();
stateTable.put(currentNamespace, userMap);
@@ -106,7 +124,7 @@ public void put(UK userKey, UV userValue) {
@Override
public void putAll(Map<UK, UV> value) {
- Map<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = getUserMap();
if (userMap == null) {
userMap = new HashMap<>();
@@ -119,7 +137,7 @@ public void putAll(Map<UK, UV> value) {
@Override
public void remove(UK userKey) {
- Map<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = getUserMap();
if (userMap == null) {
return;
}
@@ -131,33 +149,50 @@ public void remove(UK userKey) {
}
}
+ @Override
+ public void removeAll(Collection<UK> keys) {
+
+ Map<UK, UV> userMap = getUserMap();
+
+ if (userMap == null) {
+ return;
+ }
+ for (UK key : keys) {
+ userMap.remove(key);
+ }
+
+ if (userMap.isEmpty()) {
+ clear();
+ }
+ }
+
@Override
public boolean contains(UK userKey) {
- Map<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = getUserMap();
return userMap != null && userMap.containsKey(userKey);
}
@Override
public Iterable<Map.Entry<UK, UV>> entries() {
- Map<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = getUserMap();
return userMap == null ? null : userMap.entrySet();
}
@Override
public Iterable<UK> keys() {
- Map<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = getUserMap();
return userMap == null ? null : userMap.keySet();
}
@Override
public Iterable<UV> values() {
- Map<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = getUserMap();
return userMap == null ? null : userMap.values();
}
@Override
public Iterator<Map.Entry<UK, UV>> iterator() {
- Map<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = getUserMap();
return userMap == null ? null : userMap.entrySet().iterator();
}
@@ -202,4 +237,8 @@ public boolean contains(UK userKey) {
stateTable.getNamespaceSerializer(),
(Map<UK, UV>) stateDesc.getDefaultValue());
}
+
+ private Map<UK, UV> getUserMap() {
+ return stateTable.get(currentNamespace);
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index f6f81ffc940..283e2287570 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -26,6 +26,7 @@
import javax.annotation.Nonnull;
import java.util.AbstractMap;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -57,6 +58,23 @@ public UV get(UK key) throws Exception {
return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
}
+ @Override
+ public Map<UK, UV> getAll(Collection<UK> keys) throws Exception {
+ Map<UK, TtlValue<UV>> ttlValueMap = original.getAll(keys);
+ if (ttlValueMap.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ Map<UK, UV> maps = new HashMap<>();
+ for (UK key : keys) {
+ TtlValue<UV> ttlValue = ttlValueMap.get(key);
+ UV value = getWithTtlCheckAndUpdate(() -> ttlValue, v -> original.put(key, ttlValue), () -> original.remove(key));
+ maps.put(key, value);
+ }
+
+ return maps;
+ }
+
@Override
public void put(UK key, UV value) throws Exception {
original.put(key, wrapWithTs(value));
@@ -79,6 +97,11 @@ public void remove(UK key) throws Exception {
original.remove(key);
}
+ @Override
+ public void removeAll(Collection<UK> keys) throws Exception {
+ original.removeAll(keys);
+ }
+
@Override
public boolean contains(UK key) throws Exception {
return get(key) != null;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 059a706c6a8..ee9d21d56fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -2872,6 +2872,22 @@ public void testMapState() throws Exception {
assertTrue(entry.getValue().endsWith(updateSuffix));
}
+ // getAll
+ backend.setCurrentKey("3");
+ Map<Integer, String> kv = new HashMap<>();
+ for (int i = 0; i < 10; ++i) {
+ kv.put(i, String.valueOf(i));
+ }
+ state.putAll(kv);
+
+ Map<Integer, String> actural = state.getAll(kv.keySet());
+ assertEquals(kv, actural);
+
+ state.removeAll(kv.keySet());
+ for (Integer key : kv.keySet()) {
+ assertNull(state.get(key));
+ }
+
backend.dispose();
// restore the first snapshot and validate it
backend = restoreKeyedBackend(StringSerializer.INSTANCE, snapshot1);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
index 9b5ac10c92d..5cd37134663 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
@@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalMapState;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -46,6 +47,18 @@ public UV get(UK key) {
return getInternal().get(key);
}
+ @Override
+ public Map<UK, UV> getAll(Collection<UK> keys) {
+
+ Map<UK, UV> userMaps = getInternal();
+
+ Map<UK, UV> maps = new HashMap<>(keys.size());
+ for (UK key : keys) {
+ maps.put(key, userMaps.get(key));
+ }
+ return maps;
+ }
+
@Override
public void put(UK key, UV value) {
this.getInternal().put(key, value);
@@ -61,6 +74,15 @@ public void remove(UK key) {
getInternal().remove(key);
}
+ @Override
+ public void removeAll(Collection<UK> keys) {
+
+ Map<UK, UV> internalMaps = getInternal();
+ for (UK key : keys) {
+ internalMaps.remove(key);
+ }
+ }
+
@Override
public boolean contains(UK key) {
return getInternal().containsKey(key);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index ad6b7c22ec4..e881cad991c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -48,6 +48,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -125,6 +127,16 @@ public UV get(UK userKey) throws IOException, RocksDBException {
return (rawValueBytes == null ? null : deserializeUserValue(rawValueBytes));
}
+ @Override
+ public Map<UK, UV> getAll(Collection<UK> keys) throws IOException, RocksDBException {
+
+ Map<UK, UV> maps = new HashMap<>(keys.size());
+ for (UK key : keys) {
+ maps.put(key, get(key));
+ }
+ return maps;
+ }
+
@Override
public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
@@ -156,6 +168,13 @@ public void remove(UK userKey) throws IOException, RocksDBException {
backend.db.delete(columnFamily, writeOptions, rawKeyBytes);
}
+ @Override
+ public void removeAll(Collection<UK> keys) throws Exception {
+ for (UK key : keys) {
+ remove(key);
+ }
+ }
+
@Override
public boolean contains(UK userKey) throws IOException, RocksDBException {
byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey);
With regards,
Apache Git Services