You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by na...@apache.org on 2022/03/29 16:07:42 UTC
[ozone] branch HDDS-3630 updated: HDDS-6428. Add prefix iterator support to RDBTable. (#3176)
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch HDDS-3630
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3630 by this push:
new b22dfd7 HDDS-6428. Add prefix iterator support to RDBTable. (#3176)
b22dfd7 is described below
commit b22dfd7e86432edf580790230e5399da85707cfa
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Wed Mar 30 00:07:03 2022 +0800
HDDS-6428. Add prefix iterator support to RDBTable. (#3176)
---
.../container/keyvalue/KeyValueContainerData.java | 8 +-
.../keyvalue/helpers/KeyValueContainerUtil.java | 3 +-
.../container/keyvalue/impl/BlockManagerImpl.java | 2 +-
.../background/BlockDeletingService.java | 4 +-
.../ozone/container/metadata/DatanodeTable.java | 16 ++-
.../metadata/SchemaOneDeletedBlocksTable.java | 8 +-
.../container/common/TestBlockDeletingService.java | 3 +-
.../TestSchemaOneBackwardsCompatibility.java | 24 ++--
.../keyvalue/TestKeyValueBlockIterator.java | 9 +-
.../hadoop/hdds/utils/db/RDBStoreIterator.java | 54 ++++++++-
.../org/apache/hadoop/hdds/utils/db/RDBTable.java | 52 ++++----
.../org/apache/hadoop/hdds/utils/db/Table.java | 16 ++-
.../apache/hadoop/hdds/utils/db/TypedTable.java | 25 +++-
.../hadoop/hdds/utils/db/TestRDBStoreIterator.java | 54 +++++++++
.../hadoop/hdds/utils/db/TestRDBTableStore.java | 133 +++++++++++++++++++++
.../hadoop/hdds/scm/server/SCMCertStore.java | 6 +-
.../ozone/TestStorageContainerManagerHelper.java | 5 +-
.../apache/hadoop/ozone/debug/PrefixParser.java | 2 +-
18 files changed, 356 insertions(+), 68 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
index 55d8f6d..9ea38b5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
@@ -350,7 +350,9 @@ public class KeyValueContainerData extends ContainerData {
}
public KeyPrefixFilter getUnprefixedKeyFilter() {
- return new KeyPrefixFilter().addFilter(containerPrefix() + "#", true);
+ String schemaPrefix = containerPrefix();
+ return new KeyPrefixFilter().addFilter(
+ schemaPrefix == null ? "#" : schemaPrefix + "#", true);
}
public KeyPrefixFilter getDeletingBlockKeyFilter() {
@@ -368,11 +370,11 @@ public class KeyValueContainerData extends ContainerData {
/**
* Schema v3 use containerID as key prefix,
- * for other schemas just return empty.
+ * for other schemas just return null.
* @return
*/
public String containerPrefix() {
- return "";
+ return null;
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index 365b44e..a1486c4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -217,7 +217,8 @@ public final class KeyValueContainerUtil {
kvContainerData.getDeletingBlockKeyFilter();
int numPendingDeletionBlocks =
store.getBlockDataTable()
- .getSequentialRangeKVs(null, Integer.MAX_VALUE, filter)
+ .getSequentialRangeKVs(kvContainerData.startKeyEmpty(),
+ Integer.MAX_VALUE, kvContainerData.containerPrefix(), filter)
.size();
kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks);
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 80e48e0..65c764b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -331,7 +331,7 @@ public class BlockManagerImpl implements BlockManager {
List<? extends Table.KeyValue<String, BlockData>> range =
db.getStore().getBlockDataTable()
.getSequentialRangeKVs(startKey, count,
- cData.getUnprefixedKeyFilter());
+ cData.containerPrefix(), cData.getUnprefixedKeyFilter());
for (Table.KeyValue<String, BlockData> entry : range) {
BlockData data = new BlockData(entry.getValue().getBlockID());
result.add(data);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
index 36294d3..2df22b1 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
@@ -317,7 +317,9 @@ public class BlockDeletingService extends BackgroundService {
KeyPrefixFilter filter = containerData.getDeletingBlockKeyFilter();
List<? extends Table.KeyValue<String, BlockData>> toDeleteBlocks =
blockDataTable
- .getSequentialRangeKVs(null, (int) blocksToDelete, filter);
+ .getSequentialRangeKVs(containerData.startKeyEmpty(),
+ (int) blocksToDelete, containerData.containerPrefix(),
+ filter);
if (toDeleteBlocks.isEmpty()) {
LOG.debug("No under deletion block found in container : {}",
containerData.getContainerID());
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
index 2fc5576..37eb58d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
@@ -78,6 +78,14 @@ public class DatanodeTable<KEY, VALUE> implements Table<KEY, VALUE> {
}
@Override
+ public final TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator(
+ KEY prefix) {
+ throw new UnsupportedOperationException("Iterating tables directly is not" +
+ " supported for datanode containers due to differing schema " +
+ "version.");
+ }
+
+ @Override
public String getName() throws IOException {
return table.getName();
}
@@ -109,18 +117,18 @@ public class DatanodeTable<KEY, VALUE> implements Table<KEY, VALUE> {
@Override
public List<? extends KeyValue<KEY, VALUE>> getRangeKVs(
- KEY startKey, int count,
+ KEY startKey, int count, KEY prefix,
MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException {
- return table.getRangeKVs(startKey, count, filters);
+ return table.getRangeKVs(startKey, count, prefix, filters);
}
@Override
public List<? extends KeyValue<KEY, VALUE>> getSequentialRangeKVs(
- KEY startKey, int count,
+ KEY startKey, int count, KEY prefix,
MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException {
- return table.getSequentialRangeKVs(startKey, count, filters);
+ return table.getSequentialRangeKVs(startKey, count, prefix, filters);
}
@Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java
index 3f86cd2..42858c8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java
@@ -97,7 +97,7 @@ public class SchemaOneDeletedBlocksTable extends DatanodeTable<String,
@Override
public List<? extends KeyValue<String, ChunkInfoList>> getRangeKVs(
- String startKey, int count,
+ String startKey, int count, String prefix,
MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException {
@@ -105,12 +105,12 @@ public class SchemaOneDeletedBlocksTable extends DatanodeTable<String,
// else in this schema version. Ignore any user passed prefixes that could
// collide with this and return results that are not deleted blocks.
return unprefix(super.getRangeKVs(prefix(startKey), count,
- getDeletedFilter()));
+ prefix, getDeletedFilter()));
}
@Override
public List<? extends KeyValue<String, ChunkInfoList>> getSequentialRangeKVs(
- String startKey, int count,
+ String startKey, int count, String prefix,
MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException {
@@ -118,7 +118,7 @@ public class SchemaOneDeletedBlocksTable extends DatanodeTable<String,
// else in this schema version. Ignore any user passed prefixes that could
// collide with this and return results that are not deleted blocks.
return unprefix(super.getSequentialRangeKVs(prefix(startKey), count,
- getDeletedFilter()));
+ prefix, getDeletedFilter()));
}
private static String prefix(String key) {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index 6d2e4ac..38e6297 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -376,7 +376,8 @@ public class TestBlockDeletingService {
KeyValueContainerData data) throws IOException {
if (data.getSchemaVersion().equals(SCHEMA_V1)) {
return meta.getStore().getBlockDataTable()
- .getRangeKVs(null, 100, data.getDeletingBlockKeyFilter())
+ .getRangeKVs(null, 100, data.containerPrefix(),
+ data.getDeletingBlockKeyFilter())
.size();
} else if (data.getSchemaVersion().equals(SCHEMA_V2)) {
int pendingBlocks = 0;
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
index ff013da..22767f0 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
@@ -177,7 +177,8 @@ public class TestSchemaOneBackwardsCompatibility {
// Test rangeKVs.
List<? extends Table.KeyValue<String, ChunkInfoList>> deletedBlocks =
- deletedBlocksTable.getRangeKVs(cData.startKeyEmpty(), 100);
+ deletedBlocksTable.getRangeKVs(cData.startKeyEmpty(), 100,
+ cData.containerPrefix());
for (Table.KeyValue<String, ChunkInfoList> kv: deletedBlocks) {
assertFalse(kv.getKey().contains(prefix));
@@ -185,7 +186,7 @@ public class TestSchemaOneBackwardsCompatibility {
// Test sequentialRangeKVs.
deletedBlocks = deletedBlocksTable.getRangeKVs(cData.startKeyEmpty(),
- 100);
+ 100, cData.containerPrefix());
for (Table.KeyValue<String, ChunkInfoList> kv: deletedBlocks) {
assertFalse(kv.getKey().contains(prefix));
@@ -330,7 +331,8 @@ public class TestSchemaOneBackwardsCompatibility {
// Read blocks that were already deleted before the upgrade.
List<? extends Table.KeyValue<String, ChunkInfoList>> deletedBlocks =
refCountedDB.getStore().getDeletedBlocksTable()
- .getRangeKVs(cData.startKeyEmpty(), 100);
+ .getRangeKVs(cData.startKeyEmpty(), 100,
+ cData.containerPrefix());
Set<String> preUpgradeBlocks = new HashSet<>();
@@ -391,7 +393,7 @@ public class TestSchemaOneBackwardsCompatibility {
// Test decoding keys from the database.
List<? extends Table.KeyValue<String, BlockData>> blockKeyValues =
blockDataTable.getRangeKVs(cData.startKeyEmpty(), 100,
- cData.getUnprefixedKeyFilter());
+ cData.containerPrefix(), cData.getUnprefixedKeyFilter());
List<String> decodedKeys = new ArrayList<>();
@@ -434,7 +436,7 @@ public class TestSchemaOneBackwardsCompatibility {
// Test decoding keys from the database.
List<? extends Table.KeyValue<String, BlockData>> blockKeyValues =
blockDataTable.getRangeKVs(cData.startKeyEmpty(), 100,
- cData.getDeletingBlockKeyFilter());
+ cData.containerPrefix(), cData.getDeletingBlockKeyFilter());
List<String> decodedKeys = new ArrayList<>();
@@ -502,7 +504,8 @@ public class TestSchemaOneBackwardsCompatibility {
// Test decoding keys from the database.
List<? extends Table.KeyValue<String, ChunkInfoList>> chunkInfoKeyValues =
- deletedBlocksTable.getRangeKVs(cData.startKeyEmpty(), 100);
+ deletedBlocksTable.getRangeKVs(cData.startKeyEmpty(), 100,
+ cData.containerPrefix());
List<String> decodedKeys = new ArrayList<>();
@@ -600,7 +603,8 @@ public class TestSchemaOneBackwardsCompatibility {
throws IOException {
return refCountedDB.getStore().getDeletedBlocksTable()
.getRangeKVs(cData.startKeyEmpty(), 100,
- cData.getUnprefixedKeyFilter()).size();
+ cData.containerPrefix(),
+ cData.getUnprefixedKeyFilter()).size();
}
private int countDeletingBlocks(DBHandle refCountedDB,
@@ -608,7 +612,8 @@ public class TestSchemaOneBackwardsCompatibility {
throws IOException {
return refCountedDB.getStore().getBlockDataTable()
.getRangeKVs(cData.startKeyEmpty(), 100,
- cData.getDeletingBlockKeyFilter()).size();
+ cData.containerPrefix(),
+ cData.getDeletingBlockKeyFilter()).size();
}
private int countUnprefixedBlocks(DBHandle refCountedDB,
@@ -616,7 +621,8 @@ public class TestSchemaOneBackwardsCompatibility {
throws IOException {
return refCountedDB.getStore().getBlockDataTable()
.getRangeKVs(cData.startKeyEmpty(), 100,
- cData.getUnprefixedKeyFilter()).size();
+ cData.containerPrefix(),
+ cData.getUnprefixedKeyFilter()).size();
}
/**
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
index afd5fc2..9c3cd19 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
@@ -285,9 +285,11 @@ public class TestKeyValueBlockIterator {
blockIDs.get(OzoneConsts.DELETING_KEY_PREFIX));
// Test arbitrary filter.
+ String schemaPrefix = containerData.containerPrefix();
MetadataKeyFilters.KeyPrefixFilter secondFilter =
new MetadataKeyFilters.KeyPrefixFilter()
- .addFilter(containerData.containerPrefix() + secondPrefix);
+ .addFilter(schemaPrefix == null ?
+ secondPrefix : schemaPrefix + secondPrefix);
testWithFilter(secondFilter, blockIDs.get(secondPrefix));
}
@@ -383,6 +385,7 @@ public class TestKeyValueBlockIterator {
// prefix.
Table<String, BlockData> blockDataTable =
metadataStore.getStore().getBlockDataTable();
+ String schemaPrefix = containerData.containerPrefix();
for (Map.Entry<String, Integer> entry: prefixCounts.entrySet()) {
String prefix = entry.getKey();
@@ -394,8 +397,8 @@ public class TestKeyValueBlockIterator {
blockIndex++;
BlockData blockData = new BlockData(blockID);
blockData.setChunks(chunkList);
- String blockKey = containerData.containerPrefix() + prefix +
- blockID.getLocalID();
+ String blockKey = (schemaPrefix == null ? "" : schemaPrefix) +
+ prefix + blockID.getLocalID();
blockDataTable.put(blockKey, blockData);
blockIDs.get(prefix).add(blockID.getLocalID());
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java
index dbe5625..8ef2b1d 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.hdds.utils.db;
import java.io.IOException;
+import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
@@ -39,15 +40,26 @@ public class RDBStoreIterator
private final RocksIterator rocksDBIterator;
private RDBTable rocksDBTable;
private ByteArrayKeyValue currentEntry;
+ // This is for schemas that use a fixed-length
+ // prefix for each key.
+ private byte[] prefix;
public RDBStoreIterator(RocksIterator iterator) {
- this.rocksDBIterator = iterator;
- seekToFirst();
+ this(iterator, null);
}
public RDBStoreIterator(RocksIterator iterator, RDBTable table) {
- this(iterator);
+ this(iterator, table, null);
+ }
+
+ public RDBStoreIterator(RocksIterator iterator, RDBTable table,
+ byte[] prefix) {
+ this.rocksDBIterator = iterator;
this.rocksDBTable = table;
+ if (prefix != null) {
+ this.prefix = Arrays.copyOf(prefix, prefix.length);
+ }
+ seekToFirst();
}
@Override
@@ -69,7 +81,8 @@ public class RDBStoreIterator
@Override
public boolean hasNext() {
- return rocksDBIterator.isValid();
+ return rocksDBIterator.isValid() &&
+ (prefix == null || startsWith(prefix, rocksDBIterator.key()));
}
@Override
@@ -84,13 +97,21 @@ public class RDBStoreIterator
@Override
public void seekToFirst() {
- rocksDBIterator.seekToFirst();
+ if (prefix == null) {
+ rocksDBIterator.seekToFirst();
+ } else {
+ rocksDBIterator.seek(prefix);
+ }
setCurrentEntry();
}
@Override
public void seekToLast() {
- rocksDBIterator.seekToLast();
+ if (prefix == null) {
+ rocksDBIterator.seekToLast();
+ } else {
+ throw new UnsupportedOperationException("seekToLast");
+ }
setCurrentEntry();
}
@@ -117,4 +138,25 @@ public class RDBStoreIterator
public void close() throws IOException {
rocksDBIterator.close();
}
+
+ private static boolean startsWith(byte[] prefix, byte[] value) {
+ if (prefix == null) {
+ return true;
+ }
+ if (value == null) {
+ return false;
+ }
+
+ int length = prefix.length;
+ if (value.length < length) {
+ return false;
+ }
+
+ for (int i = 0; i < length; i++) {
+ if (value[i] != prefix[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index 3e28170..c011cfe 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -34,7 +34,6 @@ import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
-import org.rocksdb.RocksIterator;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -214,6 +213,14 @@ class RDBTable implements Table<byte[], byte[]> {
}
@Override
+ public TableIterator<byte[], ByteArrayKeyValue> iterator(byte[] prefix) {
+ ReadOptions readOptions = new ReadOptions();
+ readOptions.setFillCache(false);
+ return new RDBStoreIterator(db.newIterator(handle, readOptions), this,
+ prefix);
+ }
+
+ @Override
public String getName() throws IOException {
try {
return StringUtils.bytes2String(this.getHandle().getName());
@@ -239,58 +246,57 @@ class RDBTable implements Table<byte[], byte[]> {
@Override
public List<ByteArrayKeyValue> getRangeKVs(byte[] startKey,
- int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+ int count, byte[] prefix,
+ MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException {
- return getRangeKVs(startKey, count, false, filters);
+ return getRangeKVs(startKey, count, false, prefix, filters);
}
@Override
public List<ByteArrayKeyValue> getSequentialRangeKVs(byte[] startKey,
- int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+ int count, byte[] prefix,
+ MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException {
- return getRangeKVs(startKey, count, true, filters);
+ return getRangeKVs(startKey, count, true, prefix, filters);
}
private List<ByteArrayKeyValue> getRangeKVs(byte[] startKey,
- int count, boolean sequential,
+ int count, boolean sequential, byte[] prefix,
MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException {
List<ByteArrayKeyValue> result = new ArrayList<>();
long start = System.currentTimeMillis();
+
if (count < 0) {
throw new IllegalArgumentException(
"Invalid count given " + count + ", count must be greater than 0");
}
- try (RocksIterator it = db.newIterator(handle)) {
+ try (TableIterator<byte[], ByteArrayKeyValue> it = iterator(prefix)) {
if (startKey == null) {
it.seekToFirst();
} else {
- if (get(startKey) == null) {
+ if ((prefix == null || startKey.length > prefix.length)
+ && get(startKey) == null) {
// Key not found, return empty list
return result;
}
it.seek(startKey);
}
- while (it.isValid() && result.size() < count) {
- byte[] currentKey = it.key();
- byte[] currentValue = it.value();
-
- it.prev();
- final byte[] prevKey = it.isValid() ? it.key() : null;
- it.seek(currentKey);
- it.next();
- final byte[] nextKey = it.isValid() ? it.key() : null;
+ while (it.hasNext() && result.size() < count) {
+ ByteArrayKeyValue currentEntry = it.next();
+ byte[] currentKey = currentEntry.getKey();
if (filters == null) {
- result.add(ByteArrayKeyValue
- .create(currentKey, currentValue));
+ result.add(currentEntry);
} else {
+ // NOTE: the preKey and nextKey are never checked
+ // in all existing underlying filters, so they could
+ // be safely as null here.
if (Arrays.stream(filters)
- .allMatch(entry -> entry.filterKey(prevKey,
- currentKey, nextKey))) {
- result.add(ByteArrayKeyValue
- .create(currentKey, currentValue));
+ .allMatch(entry -> entry.filterKey(null,
+ currentKey, null))) {
+ result.add(currentEntry);
} else {
if (result.size() > 0 && sequential) {
// if the caller asks for a sequential range of results,
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
index 554af5a..bdaaa96 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
@@ -154,6 +154,14 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator();
/**
+ * Returns a prefixed iterator for this metadata store.
+ * @param prefix
+ * @return
+ */
+ TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator(KEY prefix)
+ throws IOException;
+
+ /**
* Returns the Name of this Table.
* @return - Table Name.
* @throws IOException on failure.
@@ -228,6 +236,7 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
*
* @param startKey a start key.
* @param count max number of entries to return.
+ * @param prefix fixed key schema specific prefix
* @param filters customized one or more
* {@link MetadataKeyFilters.MetadataKeyFilter}.
* @return a list of entries found in the database or an empty list if the
@@ -236,7 +245,8 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
* @throws IllegalArgumentException if count is less than 0.
*/
List<? extends KeyValue<KEY, VALUE>> getRangeKVs(KEY startKey,
- int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+ int count, KEY prefix,
+ MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException;
/**
@@ -249,6 +259,7 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
*
* @param startKey a start key.
* @param count max number of entries to return.
+ * @param prefix fixed key schema specific prefix
* @param filters customized one or more
* {@link MetadataKeyFilters.MetadataKeyFilter}.
* @return a list of entries found in the database.
@@ -256,7 +267,8 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
* @throws IllegalArgumentException
*/
List<? extends KeyValue<KEY, VALUE>> getSequentialRangeKVs(KEY startKey,
- int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+ int count, KEY prefix,
+ MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException;
/**
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index c7f6196..3316536 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -281,6 +281,14 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
}
@Override
+ public TableIterator<KEY, TypedKeyValue> iterator(KEY prefix)
+ throws IOException {
+ TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> iterator =
+ rawTable.iterator(codecRegistry.asRawData(prefix));
+ return new TypedTableIterator(iterator, keyType, valueType);
+ }
+
+ @Override
public String getName() throws IOException {
return rawTable.getName();
}
@@ -315,19 +323,23 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
@Override
public List<TypedKeyValue> getRangeKVs(
- KEY startKey, int count,
+ KEY startKey, int count, KEY prefix,
MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException {
// A null start key means to start from the beginning of the table.
// Cannot convert a null key to bytes.
byte[] startKeyBytes = null;
+ byte[] prefixBytes = null;
if (startKey != null) {
startKeyBytes = codecRegistry.asRawData(startKey);
}
+ if (prefix != null) {
+ prefixBytes = codecRegistry.asRawData(prefix);
+ }
List<? extends KeyValue<byte[], byte[]>> rangeKVBytes =
- rawTable.getRangeKVs(startKeyBytes, count, filters);
+ rawTable.getRangeKVs(startKeyBytes, count, prefixBytes, filters);
List<TypedKeyValue> rangeKVs = new ArrayList<>();
rangeKVBytes.forEach(byteKV -> rangeKVs.add(new TypedKeyValue(byteKV)));
@@ -337,19 +349,24 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
@Override
public List<TypedKeyValue> getSequentialRangeKVs(
- KEY startKey, int count,
+ KEY startKey, int count, KEY prefix,
MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException {
// A null start key means to start from the beginning of the table.
// Cannot convert a null key to bytes.
byte[] startKeyBytes = null;
+ byte[] prefixBytes = null;
if (startKey != null) {
startKeyBytes = codecRegistry.asRawData(startKey);
}
+ if (prefix != null) {
+ prefixBytes = codecRegistry.asRawData(prefix);
+ }
List<? extends KeyValue<byte[], byte[]>> rangeKVBytes =
- rawTable.getSequentialRangeKVs(startKeyBytes, count, filters);
+ rawTable.getSequentialRangeKVs(startKeyBytes, count,
+ prefixBytes, filters);
List<TypedKeyValue> rangeKVs = new ArrayList<>();
rangeKVBytes.forEach(byteKV -> rangeKVs.add(new TypedKeyValue(byteKV)));
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java
index b49556d..2e188ab 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java
@@ -24,13 +24,17 @@ import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.rocksdb.RocksIterator;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -234,4 +238,54 @@ public class TestRDBStoreIterator {
verify(rocksDBIteratorMock, times(1)).close();
}
+
+ @Test
+ public void testNullPrefixedIterator() throws IOException {
+ RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock,
+ rocksTableMock, null);
+ verify(rocksDBIteratorMock, times(1)).seekToFirst();
+ clearInvocations(rocksDBIteratorMock);
+
+ iter.seekToFirst();
+ verify(rocksDBIteratorMock, times(1)).seekToFirst();
+ clearInvocations(rocksDBIteratorMock);
+
+ when(rocksDBIteratorMock.isValid()).thenReturn(true);
+ assertTrue(iter.hasNext());
+ verify(rocksDBIteratorMock, times(1)).isValid();
+ verify(rocksDBIteratorMock, times(0)).key();
+
+ iter.seekToLast();
+ verify(rocksDBIteratorMock, times(1)).seekToLast();
+
+ iter.close();
+ }
+
+ @Test
+ public void testNormalPrefixedIterator() throws IOException {
+ byte[] testPrefix = "sample".getBytes(StandardCharsets.UTF_8);
+ RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock,
+ rocksTableMock, testPrefix);
+ verify(rocksDBIteratorMock, times(1)).seek(testPrefix);
+ clearInvocations(rocksDBIteratorMock);
+
+ iter.seekToFirst();
+ verify(rocksDBIteratorMock, times(1)).seek(testPrefix);
+ clearInvocations(rocksDBIteratorMock);
+
+ when(rocksDBIteratorMock.isValid()).thenReturn(true);
+ when(rocksDBIteratorMock.key()).thenReturn(testPrefix);
+ assertTrue(iter.hasNext());
+ verify(rocksDBIteratorMock, times(1)).isValid();
+ verify(rocksDBIteratorMock, times(1)).key();
+
+ try {
+ iter.seekToLast();
+ fail("Prefixed iterator does not support seekToLast");
+ } catch (Exception e) {
+ assertTrue(e instanceof UnsupportedOperationException);
+ }
+
+ iter.close();
+ }
}
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
index 0f1858b..387c299 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
@@ -24,13 +24,16 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -54,6 +57,10 @@ public class TestRDBTableStore {
"Fourth", "Fifth",
"Sixth", "Seventh",
"Eighth");
+ private final List<String> prefixedFamilies = Arrays.asList(
+ "PrefixFirst"
+ );
+ private static final int PREFIX_LENGTH = 8;
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private RDBStore rdbStore = null;
@@ -84,6 +91,13 @@ public class TestRDBTableStore {
TableConfig newConfig = new TableConfig(name, new ColumnFamilyOptions());
configSet.add(newConfig);
}
+ for (String name : prefixedFamilies) {
+ ColumnFamilyOptions cfOptions = new ColumnFamilyOptions();
+ cfOptions.useFixedLengthPrefixExtractor(PREFIX_LENGTH);
+
+ TableConfig newConfig = new TableConfig(name, cfOptions);
+ configSet.add(newConfig);
+ }
rdbStore = new RDBStore(folder.newFolder(), options, configSet);
}
@@ -425,4 +439,123 @@ public class TestRDBTableStore {
testTable.put(key, value);
}
}
+
+ @Test
+ public void testPrefixedIterator() throws Exception {
+ int containerCount = 3;
+ int blockCount = 5;
+ List<String> testPrefixes = generatePrefixes(containerCount);
+ List<Map<String, String>> testData = generateKVs(testPrefixes, blockCount);
+
+ try (Table<byte[], byte[]> testTable = rdbStore.getTable("PrefixFirst")) {
+ // write data
+ populatePrefixedTable(testTable, testData);
+
+ // iterator should seek to right pos in the middle
+ byte[] samplePrefix = testPrefixes.get(2).getBytes(
+ StandardCharsets.UTF_8);
+ try (TableIterator<byte[],
+ ? extends Table.KeyValue<byte[], byte[]>> iter = testTable.iterator(
+ samplePrefix)) {
+ int keyCount = 0;
+ while (iter.hasNext()) {
+ // iterator should only meet keys with samplePrefix
+ Assert.assertTrue(Arrays.equals(
+ Arrays.copyOf(iter.next().getKey(), PREFIX_LENGTH),
+ samplePrefix));
+ keyCount++;
+ }
+
+ // iterator should end at right pos
+ Assert.assertEquals(blockCount, keyCount);
+
+ // iterator should be able to seekToFirst
+ iter.seekToFirst();
+ Assert.assertTrue(iter.hasNext());
+ Assert.assertTrue(Arrays.equals(
+ Arrays.copyOf(iter.next().getKey(), PREFIX_LENGTH),
+ samplePrefix));
+ }
+ }
+ }
+
+ @Test
+ public void testPrefixedRangeKVs() throws Exception {
+ int containerCount = 3;
+ int blockCount = 5;
+ List<String> testPrefixes = generatePrefixes(containerCount);
+ List<Map<String, String>> testData = generateKVs(testPrefixes, blockCount);
+
+ try (Table<byte[], byte[]> testTable = rdbStore.getTable("PrefixFirst")) {
+
+ // write data
+ populatePrefixedTable(testTable, testData);
+
+ byte[] samplePrefix = testPrefixes.get(2).getBytes(
+ StandardCharsets.UTF_8);
+
+ // test start at first
+ byte[] startKey = samplePrefix;
+ List<? extends Table.KeyValue<byte[], byte[]>> rangeKVs = testTable
+ .getRangeKVs(startKey, 3, samplePrefix);
+ Assert.assertEquals(3, rangeKVs.size());
+
+ // test start with a middle key
+ startKey = StringUtils.string2Bytes(
+ StringUtils.bytes2String(samplePrefix) + "3");
+ rangeKVs = testTable.getRangeKVs(startKey, blockCount, samplePrefix);
+ Assert.assertEquals(2, rangeKVs.size());
+
+ // test with a filter
+ MetadataKeyFilters.KeyPrefixFilter filter1 = new MetadataKeyFilters
+ .KeyPrefixFilter()
+ .addFilter(StringUtils.bytes2String(samplePrefix) + "1");
+ startKey = StringUtils.string2Bytes(
+ StringUtils.bytes2String(samplePrefix));
+ rangeKVs = testTable.getRangeKVs(startKey, blockCount,
+ samplePrefix, filter1);
+ Assert.assertEquals(1, rangeKVs.size());
+
+ // test start with a non-exist key
+ startKey = StringUtils.string2Bytes(
+ StringUtils.bytes2String(samplePrefix) + 123);
+ rangeKVs = testTable.getRangeKVs(startKey, 10, samplePrefix);
+ Assert.assertEquals(0, rangeKVs.size());
+ }
+ }
+
+ private List<String> generatePrefixes(int prefixCount) {
+ List<String> prefixes = new ArrayList<>();
+ for (int i = 0; i < prefixCount; i++) {
+ // use alphabetic chars so we get fixed length prefix when
+ // convert to byte[]
+ prefixes.add(RandomStringUtils.randomAlphabetic(PREFIX_LENGTH));
+ }
+ return prefixes;
+ }
+
+ private List<Map<String, String>> generateKVs(List<String> prefixes,
+ int keyCount) {
+ List<Map<String, String>> data = new ArrayList<>();
+ for (String prefix : prefixes) {
+ Map<String, String> kvs = new HashMap<>();
+ for (int i = 0; i < keyCount; i++) {
+ String key = prefix + i;
+ String val = RandomStringUtils.random(10);
+ kvs.put(key, val);
+ }
+ data.add(kvs);
+ }
+ return data;
+ }
+
+ private void populatePrefixedTable(Table<byte[], byte[]> table,
+ List<Map<String, String>> testData) throws IOException {
+ for (Map<String, String> segment : testData) {
+ for (Map.Entry<String, String> entry : segment.entrySet()) {
+ table.put(entry.getKey().getBytes(StandardCharsets.UTF_8),
+ entry.getValue().getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
index d199c41..680bf43 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
@@ -265,7 +265,7 @@ public final class SCMCertStore implements CertificateStore {
} else {
List<? extends Table.KeyValue<BigInteger, CertInfo>> certs =
scmMetadataStore.getRevokedCertsV2Table().getRangeKVs(
- startSerialID, count);
+ startSerialID, count, null);
for (Table.KeyValue<BigInteger, CertInfo> kv : certs) {
try {
@@ -290,10 +290,10 @@ public final class SCMCertStore implements CertificateStore {
if (role == SCM) {
return scmMetadataStore.getValidSCMCertsTable().getRangeKVs(
- startSerialID, count);
+ startSerialID, count, null);
} else {
return scmMetadataStore.getValidCertsTable().getRangeKVs(
- startSerialID, count);
+ startSerialID, count, null);
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
index 992bf40..6b00312 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
@@ -107,7 +107,8 @@ public class TestStorageContainerManagerHelper {
List<? extends Table.KeyValue<String, BlockData>> kvs =
db.getStore().getBlockDataTable()
- .getRangeKVs(cData.startKeyEmpty(), Integer.MAX_VALUE, filter);
+ .getRangeKVs(cData.startKeyEmpty(), Integer.MAX_VALUE,
+ cData.containerPrefix(), filter);
for (Table.KeyValue<String, BlockData> entry : kvs) {
pendingDeletionBlocks
@@ -134,7 +135,7 @@ public class TestStorageContainerManagerHelper {
List<? extends Table.KeyValue<String, BlockData>> kvs =
db.getStore().getBlockDataTable()
.getRangeKVs(cData.startKeyEmpty(), Integer.MAX_VALUE,
- cData.getUnprefixedKeyFilter());
+ cData.containerPrefix(), cData.getUnprefixedKeyFilter());
for (Table.KeyValue<String, BlockData> entry : kvs) {
allBlocks.add(Long.valueOf(entry.getKey()));
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/PrefixParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/PrefixParser.java
index 0ebc832..db43cc7 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/PrefixParser.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/PrefixParser.java
@@ -217,7 +217,7 @@ public class PrefixParser implements Callable<Void>, SubcommandWithParent {
List<? extends KeyValue
<String, ? extends WithParentObjectId>> infoList =
- table.getRangeKVs(null, 1000, filter);
+ table.getRangeKVs(null, 1000, null, filter);
for (KeyValue<String, ? extends WithParentObjectId> info :infoList) {
Path key = Paths.get(info.getKey());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org