You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/18 09:02:43 UTC
flink git commit: [FLINK-9070][state] Improve the performance of
RocksDBMapState.clear() with WriteBatch.
Repository: flink
Updated Branches:
refs/heads/master fafef15ea -> 87e54eb3b
[FLINK-9070][state] Improve the performance of RocksDBMapState.clear() with WriteBatch.
This closes #5979.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87e54eb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87e54eb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87e54eb3
Branch: refs/heads/master
Commit: 87e54eb3bc181c0119842c975de632706d23434c
Parents: fafef15
Author: sihuazhou <su...@163.com>
Authored: Sat Apr 7 21:52:05 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri May 18 11:02:02 2018 +0200
----------------------------------------------------------------------
.../streaming/state/RocksDBMapState.java | 51 ++++++++++++--------
1 file changed, 32 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/87e54eb3/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index b84b7a2..5474a1c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -221,11 +222,23 @@ public class RocksDBMapState<K, N, UK, UV>
@Override
public void clear() {
try {
- Iterator<Map.Entry<UK, UV>> iterator = iterator();
+ try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily);
+ WriteBatch writeBatch = new WriteBatch(128)) {
- while (iterator.hasNext()) {
- iterator.next();
- iterator.remove();
+ final byte[] keyPrefixBytes = serializeCurrentKeyAndNamespace();
+ iterator.seek(keyPrefixBytes);
+
+ while (iterator.isValid()) {
+ byte[] keyBytes = iterator.key();
+ if (startWithKeyPrefix(keyPrefixBytes, keyBytes)) {
+ writeBatch.remove(columnFamily, keyBytes);
+ } else {
+ break;
+ }
+ iterator.next();
+ }
+
+ backend.db.write(writeOptions, writeBatch);
}
} catch (Exception e) {
LOG.warn("Error while cleaning the state.", e);
@@ -351,6 +364,20 @@ public class RocksDBMapState<K, N, UK, UV>
return isNull ? null : valueSerializer.deserialize(in);
}
+ private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
+ if (rawKeyBytes.length < keyPrefixBytes.length) {
+ return false;
+ }
+
+ for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) {
+ if (rawKeyBytes[i] != keyPrefixBytes[i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
// ------------------------------------------------------------------------
// Internal Classes
// ------------------------------------------------------------------------
@@ -572,7 +599,7 @@ public class RocksDBMapState<K, N, UK, UV>
}
while (true) {
- if (!iterator.isValid() || !underSameKey(iterator.key())) {
+ if (!iterator.isValid() || !startWithKeyPrefix(keyPrefixBytes, iterator.key())) {
expired = true;
break;
}
@@ -595,19 +622,5 @@ public class RocksDBMapState<K, N, UK, UV>
}
}
}
-
- private boolean underSameKey(byte[] rawKeyBytes) {
- if (rawKeyBytes.length < keyPrefixBytes.length) {
- return false;
- }
-
- for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) {
- if (rawKeyBytes[i] != keyPrefixBytes[i]) {
- return false;
- }
- }
-
- return true;
- }
}
}