You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/11/06 20:42:48 UTC
[flink] 01/04: [FLINK-13034][state backends] Add isEmpty method for
MapState
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 051692a9d2ad8bfffc1087930c3ec51d692d8cc6
Author: Yun Tang <my...@live.com>
AuthorDate: Mon Jul 29 01:24:32 2019 +0800
[FLINK-13034][state backends] Add isEmpty method for MapState
This closes #9255
---
docs/dev/stream/state/state.md | 1 +
docs/dev/stream/state/state.zh.md | 2 +-
.../apache/flink/api/common/state/MapState.java | 9 +++++++
.../org/apache/flink/cep/operator/CepOperator.java | 2 +-
.../apache/flink/cep/utils/TestSharedBuffer.java | 9 +++++++
.../client/state/ImmutableMapState.java | 5 ++++
.../client/state/ImmutableMapStateTest.java | 6 +++++
.../flink/runtime/state/UserFacingMapState.java | 5 ++++
.../flink/runtime/state/heap/HeapMapState.java | 6 +++++
.../flink/runtime/state/ttl/TtlMapState.java | 6 +++++
.../flink/runtime/state/StateBackendTestBase.java | 30 ++++++++++++++++++++--
.../state/ttl/mock/MockInternalMapState.java | 5 ++++
.../contrib/streaming/state/RocksDBMapState.java | 12 +++++++++
.../apache/flink/table/api/dataview/MapView.java | 11 ++++++++
.../apache/flink/table/dataview/StateMapView.scala | 2 ++
.../table/runtime/join/TemporalRowtimeJoin.scala | 2 +-
.../flink/table/runtime/dataview/StateMapView.java | 10 ++++++++
.../join/temporal/TemporalRowTimeJoinOperator.java | 2 +-
.../AbstractRowTimeUnboundedPrecedingOver.java | 3 +--
.../operators/window/MergingWindowSetTest.java | 10 ++++++++
20 files changed, 130 insertions(+), 8 deletions(-)
diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index c6d4c95..14ce200 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -115,6 +115,7 @@ added using `add(T)` are folded into an aggregate using a specified `FoldFunctio
retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or
`putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable
views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively.
+You can also use `isEmpty()` to check whether this map contains any key-value mappings.
All types of state also have a method `clear()` that clears the state for the currently
active key, i.e. the key of the input element.
diff --git a/docs/dev/stream/state/state.zh.md b/docs/dev/stream/state/state.zh.md
index 78da641..b5d40eb 100644
--- a/docs/dev/stream/state/state.zh.md
+++ b/docs/dev/stream/state/state.zh.md
@@ -87,7 +87,7 @@ managed keyed state 接口提供不同类型状态的访问接口,这些状态
接口与 `ListState` 类似,但使用`add(T)`添加的元素会用指定的 `FoldFunction` 折叠成聚合值。
* `MapState<UK, UV>`: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 `put(UK,UV)` 或者 `putAll(Map<UK,UV>)` 添加映射。
- 使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 分别检索映射、键和值的可迭代视图。
+ 使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 分别检索映射、键和值的可迭代视图。你还可以通过 `isEmpty()` 来判断是否包含任何键值对。
所有类型的状态还有一个`clear()` 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
index 7a130d4..94eb275 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
@@ -124,4 +124,13 @@ public interface MapState<UK, UV> extends State {
* @throws Exception Thrown if the system cannot access the state.
*/
Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
+
+ /**
+ * Returns true if this state contains no key-value mappings, otherwise false.
+ *
+ * @return True if this state contains no key-value mappings, otherwise false.
+ *
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ boolean isEmpty() throws Exception;
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index fe95c6d..2717c13 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -530,7 +530,7 @@ public class CepOperator<IN, KEY, OUT>
@VisibleForTesting
boolean hasNonEmptyPQ(KEY key) throws Exception {
setCurrentKey(key);
- return elementQueueState.keys().iterator().hasNext();
+ return !elementQueueState.isEmpty();
}
@VisibleForTesting
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
index 4d510cf..1f5672d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
@@ -220,6 +220,15 @@ public class TestSharedBuffer<V> extends SharedBuffer<V> {
}
@Override
+ public boolean isEmpty() throws Exception {
+ if (values == null) {
+ return true;
+ }
+
+ return values.isEmpty();
+ }
+
+ @Override
public void clear() {
stateWrites++;
this.values = null;
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
index 4d51b7d..7a20a96 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
@@ -114,6 +114,11 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
}
@Override
+ public boolean isEmpty() {
+ return state.isEmpty();
+ }
+
+ @Override
public void clear() {
throw MODIFICATION_ATTEMPT_ERROR;
}
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
index 6465257..3694c54 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
@@ -32,6 +32,7 @@ import java.util.Iterator;
import java.util.Map;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
@@ -186,4 +187,9 @@ public class ImmutableMapStateTest {
mapState.clear();
}
+
+ @Test
+ public void testIsEmpty() throws Exception {
+ assertFalse(mapState.isEmpty());
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
index ce4d032..301909b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
@@ -95,4 +95,9 @@ class UserFacingMapState<K, V> implements MapState<K, V> {
Iterator<Map.Entry<K, V>> original = originalState.iterator();
return original != null ? original : emptyState.entrySet().iterator();
}
+
+ @Override
+ public boolean isEmpty() throws Exception {
+ return originalState.isEmpty();
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 745e7f4..23620b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -162,6 +162,12 @@ class HeapMapState<K, N, UK, UV>
}
@Override
+ public boolean isEmpty() {
+ Map<UK, UV> userMap = stateTable.get(currentNamespace);
+ return userMap == null || userMap.isEmpty();
+ }
+
+ @Override
public byte[] getSerializedValue(
final byte[] serializedKeyAndNamespace,
final TypeSerializer<K> safeKeySerializer,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index c3f624a..cb06174 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -121,6 +121,12 @@ class TtlMapState<K, N, UK, UV>
return entries().iterator();
}
+ @Override
+ public boolean isEmpty() throws Exception {
+ accessCallback.run();
+ return original.isEmpty();
+ }
+
@Nullable
@Override
public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, TtlValue<UV>> ttlValue) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 132fd01..f90720d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -2573,7 +2573,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
List<Integer> expectedKeys = Arrays.asList(103, 1031, 1032);
assertEquals(keys.size(), expectedKeys.size());
keys.removeAll(expectedKeys);
- assertTrue(keys.isEmpty());
List<String> values = new ArrayList<>();
for (String value : state.values()) {
@@ -2582,7 +2581,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
List<String> expectedValues = Arrays.asList("103", "1031", "1032");
assertEquals(values.size(), expectedValues.size());
values.removeAll(expectedValues);
- assertTrue(values.isEmpty());
// make some more modifications
backend.setCurrentKey("1");
@@ -2655,6 +2653,34 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
backend.dispose();
}
+ @Test
+ public void testMapStateIsEmpty() throws Exception {
+ MapStateDescriptor<Integer, Long> kvId = new MapStateDescriptor<>("id", Integer.class, Long.class);
+
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+ try {
+ MapState<Integer, Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ backend.setCurrentKey(1);
+ assertTrue(state.isEmpty());
+
+ int stateSize = 1024;
+ for (int i = 0; i < stateSize; i++) {
+ state.put(i, i * 2L);
+ assertFalse(state.isEmpty());
+ }
+
+ for (int i = 0; i < stateSize; i++) {
+ assertFalse(state.isEmpty());
+ state.remove(i);
+ }
+ assertTrue(state.isEmpty());
+
+ } finally {
+ backend.dispose();
+ }
+ }
+
/**
* Verify iterator of {@link MapState} supporting arbitrary access, see [FLINK-10267] to know more details.
*/
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
index 9b5ac10..28b2a24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
@@ -86,6 +86,11 @@ public class MockInternalMapState<K, N, UK, UV>
return entries().iterator();
}
+ @Override
+ public boolean isEmpty() {
+ return getInternal().isEmpty();
+ }
+
@SuppressWarnings({"unchecked", "unused"})
static <N, T, S extends State, IS extends S> IS createState(
TypeSerializer<N> namespaceSerializer,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 64ce823..e7e1d25 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -240,6 +240,18 @@ class RocksDBMapState<K, N, UK, UV>
}
@Override
+ public boolean isEmpty() {
+ final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
+
+ try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) {
+
+ iterator.seek(prefixBytes);
+
+ return !iterator.isValid() || !startWithKeyPrefix(prefixBytes, iterator.key());
+ }
+ }
+
+ @Override
public void clear() {
try {
try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily);
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
index 7feb07b..b7a3704 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
@@ -204,6 +204,17 @@ public class MapView<K, V> implements DataView {
}
/**
+ * Returns true if the map view contains no key-value mappings, otherwise false.
+ *
+ * @return True if the map view contains no key-value mappings, otherwise false.
+ *
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public boolean isEmpty() throws Exception {
+ return map.isEmpty();
+ }
+
+ /**
* Removes all entries of this map.
*/
@Override
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala
index 22f5f0b..2096cf6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala
@@ -50,5 +50,7 @@ class StateMapView[K, V](state: MapState[K, V]) extends MapView[K, V] {
override def iterator: util.Iterator[util.Map.Entry[K, V]] = state.iterator()
+ override def isEmpty(): Boolean = state.isEmpty
+
override def clear(): Unit = state.clear()
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
index f691109..73c8760 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
@@ -169,7 +169,7 @@ class TemporalRowtimeJoin(
// if we have more state at any side, then update the timer, else clean it up.
if (stateCleaningEnabled) {
- if (lastUnprocessedTime < Long.MaxValue || rightState.iterator().hasNext) {
+ if (lastUnprocessedTime < Long.MaxValue || !rightState.isEmpty) {
registerProcessingCleanUpTimer()
} else {
cleanUpLastTimer()
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
index c7f2686..16d96d6 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
@@ -108,6 +108,11 @@ public abstract class StateMapView<N, MK, MV> extends MapView<MK, MV> implements
}
@Override
+ public boolean isEmpty() throws Exception {
+ return getMapState().isEmpty();
+ }
+
+ @Override
public void clear() {
getMapState().clear();
}
@@ -192,6 +197,11 @@ public abstract class StateMapView<N, MK, MV> extends MapView<MK, MV> implements
}
@Override
+ public boolean isEmpty() throws Exception {
+ return getMapState().isEmpty() && getNullState().value() == null;
+ }
+
+ @Override
public void clear() {
getMapState().clear();
getNullState().clear();
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
index 64be99b..f55fdd1 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
@@ -189,7 +189,7 @@ public class TemporalRowTimeJoinOperator
// if we have more state at any side, then update the timer, else clean it up.
if (stateCleaningEnabled) {
- if (lastUnprocessedTime < Long.MAX_VALUE || rightState.iterator().hasNext()) {
+ if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) {
registerProcessingCleanupTimer();
} else {
cleanupLastTimer();
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java
index e5a3216..297f8bd 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java
@@ -157,8 +157,7 @@ public abstract class AbstractRowTimeUnboundedPrecedingOver<K> extends KeyedProc
if (stateCleaningEnabled) {
// we check whether there are still records which have not been processed yet
- boolean noRecordsToProcess = !inputState.keys().iterator().hasNext();
- if (noRecordsToProcess) {
+ if (inputState.isEmpty()) {
// we clean the state
cleanupState(inputState, accState);
function.cleanup();
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java
index 7a502b6..e53751f 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java
@@ -418,6 +418,11 @@ public class MergingWindowSetTest {
}
@Override
+ public boolean isEmpty() throws Exception {
+ return map.isEmpty();
+ }
+
+ @Override
public void clear() {
map.clear();
}
@@ -589,6 +594,11 @@ public class MergingWindowSetTest {
}
@Override
+ public boolean isEmpty() {
+ return internalMap.isEmpty();
+ }
+
+ @Override
public void clear() {
internalMap.clear();
}