You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/11/06 20:42:48 UTC

[flink] 01/04: [FLINK-13034][state backends] Add isEmpty method for MapState

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 051692a9d2ad8bfffc1087930c3ec51d692d8cc6
Author: Yun Tang <my...@live.com>
AuthorDate: Mon Jul 29 01:24:32 2019 +0800

    [FLINK-13034][state backends] Add isEmpty method for MapState
    
    This closes #9255
---
 docs/dev/stream/state/state.md                     |  1 +
 docs/dev/stream/state/state.zh.md                  |  2 +-
 .../apache/flink/api/common/state/MapState.java    |  9 +++++++
 .../org/apache/flink/cep/operator/CepOperator.java |  2 +-
 .../apache/flink/cep/utils/TestSharedBuffer.java   |  9 +++++++
 .../client/state/ImmutableMapState.java            |  5 ++++
 .../client/state/ImmutableMapStateTest.java        |  6 +++++
 .../flink/runtime/state/UserFacingMapState.java    |  5 ++++
 .../flink/runtime/state/heap/HeapMapState.java     |  6 +++++
 .../flink/runtime/state/ttl/TtlMapState.java       |  6 +++++
 .../flink/runtime/state/StateBackendTestBase.java  | 30 ++++++++++++++++++++--
 .../state/ttl/mock/MockInternalMapState.java       |  5 ++++
 .../contrib/streaming/state/RocksDBMapState.java   | 12 +++++++++
 .../apache/flink/table/api/dataview/MapView.java   | 11 ++++++++
 .../apache/flink/table/dataview/StateMapView.scala |  2 ++
 .../table/runtime/join/TemporalRowtimeJoin.scala   |  2 +-
 .../flink/table/runtime/dataview/StateMapView.java | 10 ++++++++
 .../join/temporal/TemporalRowTimeJoinOperator.java |  2 +-
 .../AbstractRowTimeUnboundedPrecedingOver.java     |  3 +--
 .../operators/window/MergingWindowSetTest.java     | 10 ++++++++
 20 files changed, 130 insertions(+), 8 deletions(-)

diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index c6d4c95..14ce200 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -115,6 +115,7 @@ added using `add(T)` are folded into an aggregate using a specified `FoldFunctio
 retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or
 `putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable
 views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively.
+You can also use `isEmpty()` to check whether this map contains any key-value mappings.
 
 All types of state also have a method `clear()` that clears the state for the currently
 active key, i.e. the key of the input element.
diff --git a/docs/dev/stream/state/state.zh.md b/docs/dev/stream/state/state.zh.md
index 78da641..b5d40eb 100644
--- a/docs/dev/stream/state/state.zh.md
+++ b/docs/dev/stream/state/state.zh.md
@@ -87,7 +87,7 @@ managed keyed state 接口提供不同类型状态的访问接口,这些状态
 接口与 `ListState` 类似,但使用`add(T)`添加的元素会用指定的 `FoldFunction` 折叠成聚合值。
 
 * `MapState<UK, UV>`: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 `put(UK,UV)` 或者 `putAll(Map<UK,UV>)` 添加映射。
- 使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 分别检索映射、键和值的可迭代视图。
+ 使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 分别检索映射、键和值的可迭代视图。你还可以通过 `isEmpty()` 来判断是否包含任何键值对。
 
 所有类型的状态还有一个`clear()` 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
index 7a130d4..94eb275 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
@@ -124,4 +124,13 @@ public interface MapState<UK, UV> extends State {
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
+
+	/**
+	 * Returns true if this state contains no key-value mappings, otherwise false.
+	 *
+	 * @return True if this state contains no key-value mappings, otherwise false.
+	 *
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	boolean isEmpty() throws Exception;
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index fe95c6d..2717c13 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -530,7 +530,7 @@ public class CepOperator<IN, KEY, OUT>
 	@VisibleForTesting
 	boolean hasNonEmptyPQ(KEY key) throws Exception {
 		setCurrentKey(key);
-		return elementQueueState.keys().iterator().hasNext();
+		return !elementQueueState.isEmpty();
 	}
 
 	@VisibleForTesting
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
index 4d510cf..1f5672d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
@@ -220,6 +220,15 @@ public class TestSharedBuffer<V> extends SharedBuffer<V> {
 				}
 
 				@Override
+				public boolean isEmpty() throws Exception {
+					if (values == null) {
+						return true;
+					}
+
+					return values.isEmpty();
+				}
+
+				@Override
 				public void clear() {
 					stateWrites++;
 					this.values = null;
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
index 4d51b7d..7a20a96 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
@@ -114,6 +114,11 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map
 	}
 
 	@Override
+	public boolean isEmpty() {
+		return state.isEmpty();
+	}
+
+	@Override
 	public void clear() {
 		throw MODIFICATION_ATTEMPT_ERROR;
 	}
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
index 6465257..3694c54 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
@@ -32,6 +32,7 @@ import java.util.Iterator;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -186,4 +187,9 @@ public class ImmutableMapStateTest {
 
 		mapState.clear();
 	}
+
+	@Test
+	public void testIsEmpty() throws Exception {
+		assertFalse(mapState.isEmpty());
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
index ce4d032..301909b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
@@ -95,4 +95,9 @@ class UserFacingMapState<K, V> implements MapState<K, V> {
 		Iterator<Map.Entry<K, V>> original = originalState.iterator();
 		return original != null ? original : emptyState.entrySet().iterator();
 	}
+
+	@Override
+	public boolean isEmpty() throws Exception {
+		return originalState.isEmpty();
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 745e7f4..23620b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -162,6 +162,12 @@ class HeapMapState<K, N, UK, UV>
 	}
 
 	@Override
+	public boolean isEmpty() {
+		Map<UK, UV> userMap = stateTable.get(currentNamespace);
+		return userMap == null || userMap.isEmpty();
+	}
+
+	@Override
 	public byte[] getSerializedValue(
 			final byte[] serializedKeyAndNamespace,
 			final TypeSerializer<K> safeKeySerializer,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index c3f624a..cb06174 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -121,6 +121,12 @@ class TtlMapState<K, N, UK, UV>
 		return entries().iterator();
 	}
 
+	@Override
+	public boolean isEmpty() throws Exception {
+		accessCallback.run();
+		return original.isEmpty();
+	}
+
 	@Nullable
 	@Override
 	public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, TtlValue<UV>> ttlValue) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 132fd01..f90720d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -2573,7 +2573,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		List<Integer> expectedKeys = Arrays.asList(103, 1031, 1032);
 		assertEquals(keys.size(), expectedKeys.size());
 		keys.removeAll(expectedKeys);
-		assertTrue(keys.isEmpty());
 
 		List<String> values = new ArrayList<>();
 		for (String value : state.values()) {
@@ -2582,7 +2581,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		List<String> expectedValues = Arrays.asList("103", "1031", "1032");
 		assertEquals(values.size(), expectedValues.size());
 		values.removeAll(expectedValues);
-		assertTrue(values.isEmpty());
 
 		// make some more modifications
 		backend.setCurrentKey("1");
@@ -2655,6 +2653,34 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		backend.dispose();
 	}
 
+	@Test
+	public void testMapStateIsEmpty() throws Exception {
+		MapStateDescriptor<Integer, Long> kvId = new MapStateDescriptor<>("id", Integer.class, Long.class);
+
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		try {
+			MapState<Integer, Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			backend.setCurrentKey(1);
+			assertTrue(state.isEmpty());
+
+			int stateSize = 1024;
+			for (int i = 0; i < stateSize; i++) {
+				state.put(i, i * 2L);
+				assertFalse(state.isEmpty());
+			}
+
+			for (int i = 0; i < stateSize; i++) {
+				assertFalse(state.isEmpty());
+				state.remove(i);
+			}
+			assertTrue(state.isEmpty());
+
+		} finally {
+			backend.dispose();
+		}
+	}
+
 	/**
 	 * Verify iterator of {@link MapState} supporting arbitrary access, see [FLINK-10267] to know more details.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
index 9b5ac10..28b2a24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
@@ -86,6 +86,11 @@ public class MockInternalMapState<K, N, UK, UV>
 		return entries().iterator();
 	}
 
+	@Override
+	public boolean isEmpty() {
+		return getInternal().isEmpty();
+	}
+
 	@SuppressWarnings({"unchecked", "unused"})
 	static <N, T, S extends State, IS extends S> IS createState(
 		TypeSerializer<N> namespaceSerializer,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 64ce823..e7e1d25 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -240,6 +240,18 @@ class RocksDBMapState<K, N, UK, UV>
 	}
 
 	@Override
+	public boolean isEmpty() {
+		final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
+
+		try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) {
+
+			iterator.seek(prefixBytes);
+
+			return !iterator.isValid() || !startWithKeyPrefix(prefixBytes, iterator.key());
+		}
+	}
+
+	@Override
 	public void clear() {
 		try {
 			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily);
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
index 7feb07b..b7a3704 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
@@ -204,6 +204,17 @@ public class MapView<K, V> implements DataView {
 	}
 
 	/**
+	 * Returns true if the map view contains no key-value mappings, otherwise false.
+	 *
+	 * @return True if the map view contains no key-value mappings, otherwise false.
+	 *
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public boolean isEmpty() throws Exception {
+		return map.isEmpty();
+	}
+
+	/**
 	 * Removes all entries of this map.
 	 */
 	@Override
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala
index 22f5f0b..2096cf6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala
@@ -50,5 +50,7 @@ class StateMapView[K, V](state: MapState[K, V]) extends MapView[K, V] {
 
   override def iterator: util.Iterator[util.Map.Entry[K, V]] = state.iterator()
 
+  override def isEmpty(): Boolean = state.isEmpty
+
   override def clear(): Unit = state.clear()
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
index f691109..73c8760 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
@@ -169,7 +169,7 @@ class TemporalRowtimeJoin(
 
     // if we have more state at any side, then update the timer, else clean it up.
     if (stateCleaningEnabled) {
-      if (lastUnprocessedTime < Long.MaxValue || rightState.iterator().hasNext) {
+      if (lastUnprocessedTime < Long.MaxValue || !rightState.isEmpty) {
         registerProcessingCleanUpTimer()
       } else {
         cleanUpLastTimer()
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
index c7f2686..16d96d6 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
@@ -108,6 +108,11 @@ public abstract class StateMapView<N, MK, MV> extends MapView<MK, MV> implements
 		}
 
 		@Override
+		public boolean isEmpty() throws Exception {
+			return getMapState().isEmpty();
+		}
+
+		@Override
 		public void clear() {
 			getMapState().clear();
 		}
@@ -192,6 +197,11 @@ public abstract class StateMapView<N, MK, MV> extends MapView<MK, MV> implements
 		}
 
 		@Override
+		public boolean isEmpty() throws Exception {
+			return getMapState().isEmpty() && getNullState().value() == null;
+		}
+
+		@Override
 		public void clear() {
 			getMapState().clear();
 			getNullState().clear();
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
index 64be99b..f55fdd1 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
@@ -189,7 +189,7 @@ public class TemporalRowTimeJoinOperator
 
 		// if we have more state at any side, then update the timer, else clean it up.
 		if (stateCleaningEnabled) {
-			if (lastUnprocessedTime < Long.MAX_VALUE || rightState.iterator().hasNext()) {
+			if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) {
 				registerProcessingCleanupTimer();
 			} else {
 				cleanupLastTimer();
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java
index e5a3216..297f8bd 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java
@@ -157,8 +157,7 @@ public abstract class AbstractRowTimeUnboundedPrecedingOver<K> extends KeyedProc
 			if (stateCleaningEnabled) {
 
 				// we check whether there are still records which have not been processed yet
-				boolean noRecordsToProcess = !inputState.keys().iterator().hasNext();
-				if (noRecordsToProcess) {
+				if (inputState.isEmpty()) {
 					// we clean the state
 					cleanupState(inputState, accState);
 					function.cleanup();
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java
index 7a502b6..e53751f 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java
@@ -418,6 +418,11 @@ public class MergingWindowSetTest {
 		}
 
 		@Override
+		public boolean isEmpty() throws Exception {
+			return map.isEmpty();
+		}
+
+		@Override
 		public void clear() {
 			map.clear();
 		}
@@ -589,6 +594,11 @@ public class MergingWindowSetTest {
 		}
 
 		@Override
+		public boolean isEmpty() {
+			return internalMap.isEmpty();
+		}
+
+		@Override
 		public void clear() {
 			internalMap.clear();
 		}