You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/18 09:02:43 UTC

flink git commit: [FLINK-9070][state] Improve the performance of RocksDBMapState.clear() with WriteBatch.

Repository: flink
Updated Branches:
  refs/heads/master fafef15ea -> 87e54eb3b


[FLINK-9070][state] Improve the performance of RocksDBMapState.clear() with WriteBatch.

This closes #5979.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87e54eb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87e54eb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87e54eb3

Branch: refs/heads/master
Commit: 87e54eb3bc181c0119842c975de632706d23434c
Parents: fafef15
Author: sihuazhou <su...@163.com>
Authored: Sat Apr 7 21:52:05 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri May 18 11:02:02 2018 +0200

----------------------------------------------------------------------
 .../streaming/state/RocksDBMapState.java        | 51 ++++++++++++--------
 1 file changed, 32 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87e54eb3/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index b84b7a2..5474a1c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.Preconditions;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -221,11 +222,23 @@ public class RocksDBMapState<K, N, UK, UV>
 	@Override
 	public void clear() {
 		try {
-			Iterator<Map.Entry<UK, UV>> iterator = iterator();
+			try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily);
+				WriteBatch writeBatch = new WriteBatch(128)) {
 
-			while (iterator.hasNext()) {
-				iterator.next();
-				iterator.remove();
+				final byte[] keyPrefixBytes = serializeCurrentKeyAndNamespace();
+				iterator.seek(keyPrefixBytes);
+
+				while (iterator.isValid()) {
+					byte[] keyBytes = iterator.key();
+					if (startWithKeyPrefix(keyPrefixBytes, keyBytes)) {
+						writeBatch.remove(columnFamily, keyBytes);
+					} else {
+						break;
+					}
+					iterator.next();
+				}
+
+				backend.db.write(writeOptions, writeBatch);
 			}
 		} catch (Exception e) {
 			LOG.warn("Error while cleaning the state.", e);
@@ -351,6 +364,20 @@ public class RocksDBMapState<K, N, UK, UV>
 		return isNull ? null : valueSerializer.deserialize(in);
 	}
 
+	private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
+		if (rawKeyBytes.length < keyPrefixBytes.length) {
+			return false;
+		}
+
+		for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) {
+			if (rawKeyBytes[i] != keyPrefixBytes[i]) {
+				return false;
+			}
+		}
+
+		return true;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Internal Classes
 	// ------------------------------------------------------------------------
@@ -572,7 +599,7 @@ public class RocksDBMapState<K, N, UK, UV>
 				}
 
 				while (true) {
-					if (!iterator.isValid() || !underSameKey(iterator.key())) {
+					if (!iterator.isValid() || !startWithKeyPrefix(keyPrefixBytes, iterator.key())) {
 						expired = true;
 						break;
 					}
@@ -595,19 +622,5 @@ public class RocksDBMapState<K, N, UK, UV>
 				}
 			}
 		}
-
-		private boolean underSameKey(byte[] rawKeyBytes) {
-			if (rawKeyBytes.length < keyPrefixBytes.length) {
-				return false;
-			}
-
-			for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) {
-				if (rawKeyBytes[i] != keyPrefixBytes[i]) {
-					return false;
-				}
-			}
-
-			return true;
-		}
 	}
 }