You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by na...@apache.org on 2022/03/29 16:07:42 UTC

[ozone] branch HDDS-3630 updated: HDDS-6428. Add prefix iterator support to RDBTable. (#3176)

This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch HDDS-3630
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3630 by this push:
     new b22dfd7  HDDS-6428. Add prefix iterator support to RDBTable. (#3176)
b22dfd7 is described below

commit b22dfd7e86432edf580790230e5399da85707cfa
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Wed Mar 30 00:07:03 2022 +0800

    HDDS-6428. Add prefix iterator support to RDBTable. (#3176)
---
 .../container/keyvalue/KeyValueContainerData.java  |   8 +-
 .../keyvalue/helpers/KeyValueContainerUtil.java    |   3 +-
 .../container/keyvalue/impl/BlockManagerImpl.java  |   2 +-
 .../background/BlockDeletingService.java           |   4 +-
 .../ozone/container/metadata/DatanodeTable.java    |  16 ++-
 .../metadata/SchemaOneDeletedBlocksTable.java      |   8 +-
 .../container/common/TestBlockDeletingService.java |   3 +-
 .../TestSchemaOneBackwardsCompatibility.java       |  24 ++--
 .../keyvalue/TestKeyValueBlockIterator.java        |   9 +-
 .../hadoop/hdds/utils/db/RDBStoreIterator.java     |  54 ++++++++-
 .../org/apache/hadoop/hdds/utils/db/RDBTable.java  |  52 ++++----
 .../org/apache/hadoop/hdds/utils/db/Table.java     |  16 ++-
 .../apache/hadoop/hdds/utils/db/TypedTable.java    |  25 +++-
 .../hadoop/hdds/utils/db/TestRDBStoreIterator.java |  54 +++++++++
 .../hadoop/hdds/utils/db/TestRDBTableStore.java    | 133 +++++++++++++++++++++
 .../hadoop/hdds/scm/server/SCMCertStore.java       |   6 +-
 .../ozone/TestStorageContainerManagerHelper.java   |   5 +-
 .../apache/hadoop/ozone/debug/PrefixParser.java    |   2 +-
 18 files changed, 356 insertions(+), 68 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
index 55d8f6d..9ea38b5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
@@ -350,7 +350,9 @@ public class KeyValueContainerData extends ContainerData {
   }
 
   public KeyPrefixFilter getUnprefixedKeyFilter() {
-    return new KeyPrefixFilter().addFilter(containerPrefix() + "#", true);
+    String schemaPrefix = containerPrefix();
+    return new KeyPrefixFilter().addFilter(
+        schemaPrefix == null ? "#" : schemaPrefix + "#", true);
   }
 
   public KeyPrefixFilter getDeletingBlockKeyFilter() {
@@ -368,11 +370,11 @@ public class KeyValueContainerData extends ContainerData {
 
   /**
    * Schema v3 use containerID as key prefix,
-   * for other schemas just return empty.
+   * for other schemas just return null.
    * @return
    */
   public String containerPrefix() {
-    return "";
+    return null;
   }
 
   /**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index 365b44e..a1486c4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -217,7 +217,8 @@ public final class KeyValueContainerUtil {
                 kvContainerData.getDeletingBlockKeyFilter();
         int numPendingDeletionBlocks =
             store.getBlockDataTable()
-            .getSequentialRangeKVs(null, Integer.MAX_VALUE, filter)
+            .getSequentialRangeKVs(kvContainerData.startKeyEmpty(),
+                Integer.MAX_VALUE, kvContainerData.containerPrefix(), filter)
             .size();
         kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks);
       }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 80e48e0..65c764b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -331,7 +331,7 @@ public class BlockManagerImpl implements BlockManager {
         List<? extends Table.KeyValue<String, BlockData>> range =
             db.getStore().getBlockDataTable()
                 .getSequentialRangeKVs(startKey, count,
-                    cData.getUnprefixedKeyFilter());
+                    cData.containerPrefix(), cData.getUnprefixedKeyFilter());
         for (Table.KeyValue<String, BlockData> entry : range) {
           BlockData data = new BlockData(entry.getValue().getBlockID());
           result.add(data);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
index 36294d3..2df22b1 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
@@ -317,7 +317,9 @@ public class BlockDeletingService extends BackgroundService {
         KeyPrefixFilter filter = containerData.getDeletingBlockKeyFilter();
         List<? extends Table.KeyValue<String, BlockData>> toDeleteBlocks =
             blockDataTable
-                .getSequentialRangeKVs(null, (int) blocksToDelete, filter);
+                .getSequentialRangeKVs(containerData.startKeyEmpty(),
+                    (int) blocksToDelete, containerData.containerPrefix(),
+                    filter);
         if (toDeleteBlocks.isEmpty()) {
           LOG.debug("No under deletion block found in container : {}",
               containerData.getContainerID());
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
index 2fc5576..37eb58d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
@@ -78,6 +78,14 @@ public class DatanodeTable<KEY, VALUE> implements Table<KEY, VALUE> {
   }
 
   @Override
+  public final TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator(
+      KEY prefix) {
+    throw new UnsupportedOperationException("Iterating tables directly is not" +
+        " supported for datanode containers due to differing schema " +
+        "version.");
+  }
+
+  @Override
   public String getName() throws IOException {
     return table.getName();
   }
@@ -109,18 +117,18 @@ public class DatanodeTable<KEY, VALUE> implements Table<KEY, VALUE> {
 
   @Override
   public List<? extends KeyValue<KEY, VALUE>> getRangeKVs(
-          KEY startKey, int count,
+          KEY startKey, int count, KEY prefix,
           MetadataKeyFilters.MetadataKeyFilter... filters)
           throws IOException, IllegalArgumentException {
-    return table.getRangeKVs(startKey, count, filters);
+    return table.getRangeKVs(startKey, count, prefix, filters);
   }
 
   @Override
   public List<? extends KeyValue<KEY, VALUE>> getSequentialRangeKVs(
-          KEY startKey, int count,
+          KEY startKey, int count, KEY prefix,
           MetadataKeyFilters.MetadataKeyFilter... filters)
           throws IOException, IllegalArgumentException {
-    return table.getSequentialRangeKVs(startKey, count, filters);
+    return table.getSequentialRangeKVs(startKey, count, prefix, filters);
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java
index 3f86cd2..42858c8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java
@@ -97,7 +97,7 @@ public class SchemaOneDeletedBlocksTable extends DatanodeTable<String,
 
   @Override
   public List<? extends KeyValue<String, ChunkInfoList>> getRangeKVs(
-          String startKey, int count,
+          String startKey, int count, String prefix,
           MetadataKeyFilters.MetadataKeyFilter... filters)
           throws IOException, IllegalArgumentException {
 
@@ -105,12 +105,12 @@ public class SchemaOneDeletedBlocksTable extends DatanodeTable<String,
     // else in this schema version. Ignore any user passed prefixes that could
     // collide with this and return results that are not deleted blocks.
     return unprefix(super.getRangeKVs(prefix(startKey), count,
-            getDeletedFilter()));
+        prefix, getDeletedFilter()));
   }
 
   @Override
   public List<? extends KeyValue<String, ChunkInfoList>> getSequentialRangeKVs(
-          String startKey, int count,
+          String startKey, int count, String prefix,
           MetadataKeyFilters.MetadataKeyFilter... filters)
           throws IOException, IllegalArgumentException {
 
@@ -118,7 +118,7 @@ public class SchemaOneDeletedBlocksTable extends DatanodeTable<String,
     // else in this schema version. Ignore any user passed prefixes that could
     // collide with this and return results that are not deleted blocks.
     return unprefix(super.getSequentialRangeKVs(prefix(startKey), count,
-            getDeletedFilter()));
+        prefix, getDeletedFilter()));
   }
 
   private static String prefix(String key) {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index 6d2e4ac..38e6297 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -376,7 +376,8 @@ public class TestBlockDeletingService {
       KeyValueContainerData data) throws IOException {
     if (data.getSchemaVersion().equals(SCHEMA_V1)) {
       return meta.getStore().getBlockDataTable()
-          .getRangeKVs(null, 100, data.getDeletingBlockKeyFilter())
+          .getRangeKVs(null, 100, data.containerPrefix(),
+              data.getDeletingBlockKeyFilter())
           .size();
     } else if (data.getSchemaVersion().equals(SCHEMA_V2)) {
       int pendingBlocks = 0;
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
index ff013da..22767f0 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
@@ -177,7 +177,8 @@ public class TestSchemaOneBackwardsCompatibility {
 
       // Test rangeKVs.
       List<? extends Table.KeyValue<String, ChunkInfoList>> deletedBlocks =
-              deletedBlocksTable.getRangeKVs(cData.startKeyEmpty(), 100);
+              deletedBlocksTable.getRangeKVs(cData.startKeyEmpty(), 100,
+                  cData.containerPrefix());
 
       for (Table.KeyValue<String, ChunkInfoList> kv: deletedBlocks) {
         assertFalse(kv.getKey().contains(prefix));
@@ -185,7 +186,7 @@ public class TestSchemaOneBackwardsCompatibility {
 
       // Test sequentialRangeKVs.
       deletedBlocks = deletedBlocksTable.getRangeKVs(cData.startKeyEmpty(),
-          100);
+          100, cData.containerPrefix());
 
       for (Table.KeyValue<String, ChunkInfoList> kv: deletedBlocks) {
         assertFalse(kv.getKey().contains(prefix));
@@ -330,7 +331,8 @@ public class TestSchemaOneBackwardsCompatibility {
       // Read blocks that were already deleted before the upgrade.
       List<? extends Table.KeyValue<String, ChunkInfoList>> deletedBlocks =
               refCountedDB.getStore().getDeletedBlocksTable()
-                  .getRangeKVs(cData.startKeyEmpty(), 100);
+                  .getRangeKVs(cData.startKeyEmpty(), 100,
+                      cData.containerPrefix());
 
       Set<String> preUpgradeBlocks = new HashSet<>();
 
@@ -391,7 +393,7 @@ public class TestSchemaOneBackwardsCompatibility {
       // Test decoding keys from the database.
       List<? extends Table.KeyValue<String, BlockData>> blockKeyValues =
           blockDataTable.getRangeKVs(cData.startKeyEmpty(), 100,
-              cData.getUnprefixedKeyFilter());
+              cData.containerPrefix(), cData.getUnprefixedKeyFilter());
 
       List<String> decodedKeys = new ArrayList<>();
 
@@ -434,7 +436,7 @@ public class TestSchemaOneBackwardsCompatibility {
       // Test decoding keys from the database.
       List<? extends Table.KeyValue<String, BlockData>> blockKeyValues =
           blockDataTable.getRangeKVs(cData.startKeyEmpty(), 100,
-              cData.getDeletingBlockKeyFilter());
+              cData.containerPrefix(), cData.getDeletingBlockKeyFilter());
 
       List<String> decodedKeys = new ArrayList<>();
 
@@ -502,7 +504,8 @@ public class TestSchemaOneBackwardsCompatibility {
 
       // Test decoding keys from the database.
       List<? extends Table.KeyValue<String, ChunkInfoList>> chunkInfoKeyValues =
-          deletedBlocksTable.getRangeKVs(cData.startKeyEmpty(), 100);
+          deletedBlocksTable.getRangeKVs(cData.startKeyEmpty(), 100,
+              cData.containerPrefix());
 
       List<String> decodedKeys = new ArrayList<>();
 
@@ -600,7 +603,8 @@ public class TestSchemaOneBackwardsCompatibility {
           throws IOException {
     return refCountedDB.getStore().getDeletedBlocksTable()
             .getRangeKVs(cData.startKeyEmpty(), 100,
-                    cData.getUnprefixedKeyFilter()).size();
+                cData.containerPrefix(),
+                cData.getUnprefixedKeyFilter()).size();
   }
 
   private int countDeletingBlocks(DBHandle refCountedDB,
@@ -608,7 +612,8 @@ public class TestSchemaOneBackwardsCompatibility {
           throws IOException {
     return refCountedDB.getStore().getBlockDataTable()
             .getRangeKVs(cData.startKeyEmpty(), 100,
-                    cData.getDeletingBlockKeyFilter()).size();
+                cData.containerPrefix(),
+                cData.getDeletingBlockKeyFilter()).size();
   }
 
   private int countUnprefixedBlocks(DBHandle refCountedDB,
@@ -616,7 +621,8 @@ public class TestSchemaOneBackwardsCompatibility {
           throws IOException {
     return refCountedDB.getStore().getBlockDataTable()
             .getRangeKVs(cData.startKeyEmpty(), 100,
-                    cData.getUnprefixedKeyFilter()).size();
+                cData.containerPrefix(),
+                cData.getUnprefixedKeyFilter()).size();
   }
 
   /**
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
index afd5fc2..9c3cd19 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
@@ -285,9 +285,11 @@ public class TestKeyValueBlockIterator {
             blockIDs.get(OzoneConsts.DELETING_KEY_PREFIX));
 
     // Test arbitrary filter.
+    String schemaPrefix = containerData.containerPrefix();
     MetadataKeyFilters.KeyPrefixFilter secondFilter =
             new MetadataKeyFilters.KeyPrefixFilter()
-            .addFilter(containerData.containerPrefix() + secondPrefix);
+            .addFilter(schemaPrefix == null ?
+                secondPrefix : schemaPrefix + secondPrefix);
     testWithFilter(secondFilter, blockIDs.get(secondPrefix));
   }
 
@@ -383,6 +385,7 @@ public class TestKeyValueBlockIterator {
       // prefix.
       Table<String, BlockData> blockDataTable =
               metadataStore.getStore().getBlockDataTable();
+      String schemaPrefix = containerData.containerPrefix();
 
       for (Map.Entry<String, Integer> entry: prefixCounts.entrySet()) {
         String prefix = entry.getKey();
@@ -394,8 +397,8 @@ public class TestKeyValueBlockIterator {
           blockIndex++;
           BlockData blockData = new BlockData(blockID);
           blockData.setChunks(chunkList);
-          String blockKey = containerData.containerPrefix() + prefix +
-              blockID.getLocalID();
+          String blockKey = (schemaPrefix == null ? "" : schemaPrefix) +
+              prefix + blockID.getLocalID();
           blockDataTable.put(blockKey, blockData);
           blockIDs.get(prefix).add(blockID.getLocalID());
         }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java
index dbe5625..8ef2b1d 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hdds.utils.db;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.NoSuchElementException;
 import java.util.function.Consumer;
 
@@ -39,15 +40,26 @@ public class RDBStoreIterator
   private final RocksIterator rocksDBIterator;
   private RDBTable rocksDBTable;
   private ByteArrayKeyValue currentEntry;
+  // This is for schemas that use a fixed-length
+  // prefix for each key.
+  private byte[] prefix;
 
   public RDBStoreIterator(RocksIterator iterator) {
-    this.rocksDBIterator = iterator;
-    seekToFirst();
+    this(iterator, null);
   }
 
   public RDBStoreIterator(RocksIterator iterator, RDBTable table) {
-    this(iterator);
+    this(iterator, table, null);
+  }
+
+  public RDBStoreIterator(RocksIterator iterator, RDBTable table,
+      byte[] prefix) {
+    this.rocksDBIterator = iterator;
     this.rocksDBTable = table;
+    if (prefix != null) {
+      this.prefix = Arrays.copyOf(prefix, prefix.length);
+    }
+    seekToFirst();
   }
 
   @Override
@@ -69,7 +81,8 @@ public class RDBStoreIterator
 
   @Override
   public boolean hasNext() {
-    return rocksDBIterator.isValid();
+    return rocksDBIterator.isValid() &&
+        (prefix == null || startsWith(prefix, rocksDBIterator.key()));
   }
 
   @Override
@@ -84,13 +97,21 @@ public class RDBStoreIterator
 
   @Override
   public void seekToFirst() {
-    rocksDBIterator.seekToFirst();
+    if (prefix == null) {
+      rocksDBIterator.seekToFirst();
+    } else {
+      rocksDBIterator.seek(prefix);
+    }
     setCurrentEntry();
   }
 
   @Override
   public void seekToLast() {
-    rocksDBIterator.seekToLast();
+    if (prefix == null) {
+      rocksDBIterator.seekToLast();
+    } else {
+      throw new UnsupportedOperationException("seekToLast");
+    }
     setCurrentEntry();
   }
 
@@ -117,4 +138,25 @@ public class RDBStoreIterator
   public void close() throws IOException {
     rocksDBIterator.close();
   }
+
+  private static boolean startsWith(byte[] prefix, byte[] value) {
+    if (prefix == null) {
+      return true;
+    }
+    if (value == null) {
+      return false;
+    }
+
+    int length = prefix.length;
+    if (value.length < length) {
+      return false;
+    }
+
+    for (int i = 0; i < length; i++) {
+      if (value[i] != prefix[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index 3e28170..c011cfe 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -34,7 +34,6 @@ import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
-import org.rocksdb.RocksIterator;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -214,6 +213,14 @@ class RDBTable implements Table<byte[], byte[]> {
   }
 
   @Override
+  public TableIterator<byte[], ByteArrayKeyValue> iterator(byte[] prefix) {
+    ReadOptions readOptions = new ReadOptions();
+    readOptions.setFillCache(false);
+    return new RDBStoreIterator(db.newIterator(handle, readOptions), this,
+        prefix);
+  }
+
+  @Override
   public String getName() throws IOException {
     try {
       return StringUtils.bytes2String(this.getHandle().getName());
@@ -239,58 +246,57 @@ class RDBTable implements Table<byte[], byte[]> {
 
   @Override
   public List<ByteArrayKeyValue> getRangeKVs(byte[] startKey,
-      int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+      int count, byte[] prefix,
+      MetadataKeyFilters.MetadataKeyFilter... filters)
       throws IOException, IllegalArgumentException {
-    return getRangeKVs(startKey, count, false, filters);
+    return getRangeKVs(startKey, count, false, prefix, filters);
   }
 
   @Override
   public List<ByteArrayKeyValue> getSequentialRangeKVs(byte[] startKey,
-      int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+      int count, byte[] prefix,
+      MetadataKeyFilters.MetadataKeyFilter... filters)
       throws IOException, IllegalArgumentException {
-    return getRangeKVs(startKey, count, true, filters);
+    return getRangeKVs(startKey, count, true, prefix, filters);
   }
 
   private List<ByteArrayKeyValue> getRangeKVs(byte[] startKey,
-      int count, boolean sequential,
+      int count, boolean sequential, byte[] prefix,
       MetadataKeyFilters.MetadataKeyFilter... filters)
       throws IOException, IllegalArgumentException {
     List<ByteArrayKeyValue> result = new ArrayList<>();
     long start = System.currentTimeMillis();
+
     if (count < 0) {
       throw new IllegalArgumentException(
             "Invalid count given " + count + ", count must be greater than 0");
     }
-    try (RocksIterator it = db.newIterator(handle)) {
+    try (TableIterator<byte[], ByteArrayKeyValue> it = iterator(prefix)) {
       if (startKey == null) {
         it.seekToFirst();
       } else {
-        if (get(startKey) == null) {
+        if ((prefix == null || startKey.length > prefix.length)
+            && get(startKey) == null) {
           // Key not found, return empty list
           return result;
         }
         it.seek(startKey);
       }
-      while (it.isValid() && result.size() < count) {
-        byte[] currentKey = it.key();
-        byte[] currentValue = it.value();
-
-        it.prev();
-        final byte[] prevKey = it.isValid() ? it.key() : null;
 
-        it.seek(currentKey);
-        it.next();
-        final byte[] nextKey = it.isValid() ? it.key() : null;
+      while (it.hasNext() && result.size() < count) {
+        ByteArrayKeyValue currentEntry = it.next();
+        byte[] currentKey = currentEntry.getKey();
 
         if (filters == null) {
-          result.add(ByteArrayKeyValue
-                  .create(currentKey, currentValue));
+          result.add(currentEntry);
         } else {
+          // NOTE: the preKey and nextKey are never checked
+          // in all existing underlying filters, so they could
+          // be safely as null here.
           if (Arrays.stream(filters)
-                  .allMatch(entry -> entry.filterKey(prevKey,
-                          currentKey, nextKey))) {
-            result.add(ByteArrayKeyValue
-                    .create(currentKey, currentValue));
+                  .allMatch(entry -> entry.filterKey(null,
+                          currentKey, null))) {
+            result.add(currentEntry);
           } else {
             if (result.size() > 0 && sequential) {
               // if the caller asks for a sequential range of results,
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
index 554af5a..bdaaa96 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
@@ -154,6 +154,14 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
   TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator();
 
   /**
+   * Returns a prefixed iterator for this metadata store.
+   * @param prefix
+   * @return
+   */
+  TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator(KEY prefix)
+      throws IOException;
+
+  /**
    * Returns the Name of this Table.
    * @return - Table Name.
    * @throws IOException on failure.
@@ -228,6 +236,7 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
    *
    * @param startKey a start key.
    * @param count max number of entries to return.
+   * @param prefix fixed key schema specific prefix
    * @param filters customized one or more
    * {@link MetadataKeyFilters.MetadataKeyFilter}.
    * @return a list of entries found in the database or an empty list if the
@@ -236,7 +245,8 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
    * @throws IllegalArgumentException if count is less than 0.
    */
   List<? extends KeyValue<KEY, VALUE>> getRangeKVs(KEY startKey,
-          int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+          int count, KEY prefix,
+          MetadataKeyFilters.MetadataKeyFilter... filters)
           throws IOException, IllegalArgumentException;
 
   /**
@@ -249,6 +259,7 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
    *
    * @param startKey a start key.
    * @param count max number of entries to return.
+   * @param prefix fixed key schema specific prefix
    * @param filters customized one or more
    * {@link MetadataKeyFilters.MetadataKeyFilter}.
    * @return a list of entries found in the database.
@@ -256,7 +267,8 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
    * @throws IllegalArgumentException
    */
   List<? extends KeyValue<KEY, VALUE>> getSequentialRangeKVs(KEY startKey,
-          int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+          int count, KEY prefix,
+          MetadataKeyFilters.MetadataKeyFilter... filters)
           throws IOException, IllegalArgumentException;
 
   /**
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index c7f6196..3316536 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -281,6 +281,14 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
   }
 
   @Override
+  public TableIterator<KEY, TypedKeyValue> iterator(KEY prefix)
+      throws IOException {
+    TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> iterator =
+        rawTable.iterator(codecRegistry.asRawData(prefix));
+    return new TypedTableIterator(iterator, keyType, valueType);
+  }
+
+  @Override
   public String getName() throws IOException {
     return rawTable.getName();
   }
@@ -315,19 +323,23 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
 
   @Override
   public List<TypedKeyValue> getRangeKVs(
-          KEY startKey, int count,
+          KEY startKey, int count, KEY prefix,
           MetadataKeyFilters.MetadataKeyFilter... filters)
           throws IOException, IllegalArgumentException {
 
     // A null start key means to start from the beginning of the table.
     // Cannot convert a null key to bytes.
     byte[] startKeyBytes = null;
+    byte[] prefixBytes = null;
     if (startKey != null) {
       startKeyBytes = codecRegistry.asRawData(startKey);
     }
+    if (prefix != null) {
+      prefixBytes = codecRegistry.asRawData(prefix);
+    }
 
     List<? extends KeyValue<byte[], byte[]>> rangeKVBytes =
-            rawTable.getRangeKVs(startKeyBytes, count, filters);
+        rawTable.getRangeKVs(startKeyBytes, count, prefixBytes, filters);
 
     List<TypedKeyValue> rangeKVs = new ArrayList<>();
     rangeKVBytes.forEach(byteKV -> rangeKVs.add(new TypedKeyValue(byteKV)));
@@ -337,19 +349,24 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
 
   @Override
   public List<TypedKeyValue> getSequentialRangeKVs(
-          KEY startKey, int count,
+          KEY startKey, int count, KEY prefix,
           MetadataKeyFilters.MetadataKeyFilter... filters)
           throws IOException, IllegalArgumentException {
 
     // A null start key means to start from the beginning of the table.
     // Cannot convert a null key to bytes.
     byte[] startKeyBytes = null;
+    byte[] prefixBytes = null;
     if (startKey != null) {
       startKeyBytes = codecRegistry.asRawData(startKey);
     }
+    if (prefix != null) {
+      prefixBytes = codecRegistry.asRawData(prefix);
+    }
 
     List<? extends KeyValue<byte[], byte[]>> rangeKVBytes =
-            rawTable.getSequentialRangeKVs(startKeyBytes, count, filters);
+        rawTable.getSequentialRangeKVs(startKeyBytes, count,
+            prefixBytes, filters);
 
     List<TypedKeyValue> rangeKVs = new ArrayList<>();
     rangeKVBytes.forEach(byteKV -> rangeKVs.add(new TypedKeyValue(byteKV)));
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java
index b49556d..2e188ab 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java
@@ -24,13 +24,17 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 import org.rocksdb.RocksIterator;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.NoSuchElementException;
 import java.util.function.Consumer;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -234,4 +238,54 @@ public class TestRDBStoreIterator {
 
     verify(rocksDBIteratorMock, times(1)).close();
   }
+
+  @Test
+  public void testNullPrefixedIterator() throws IOException {
+    RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock,
+        rocksTableMock, null);
+    verify(rocksDBIteratorMock, times(1)).seekToFirst();
+    clearInvocations(rocksDBIteratorMock);
+
+    iter.seekToFirst();
+    verify(rocksDBIteratorMock, times(1)).seekToFirst();
+    clearInvocations(rocksDBIteratorMock);
+
+    when(rocksDBIteratorMock.isValid()).thenReturn(true);
+    assertTrue(iter.hasNext());
+    verify(rocksDBIteratorMock, times(1)).isValid();
+    verify(rocksDBIteratorMock, times(0)).key();
+
+    iter.seekToLast();
+    verify(rocksDBIteratorMock, times(1)).seekToLast();
+
+    iter.close();
+  }
+
+  @Test
+  public void testNormalPrefixedIterator() throws IOException {
+    byte[] testPrefix = "sample".getBytes(StandardCharsets.UTF_8);
+    RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock,
+        rocksTableMock, testPrefix);
+    verify(rocksDBIteratorMock, times(1)).seek(testPrefix);
+    clearInvocations(rocksDBIteratorMock);
+
+    iter.seekToFirst();
+    verify(rocksDBIteratorMock, times(1)).seek(testPrefix);
+    clearInvocations(rocksDBIteratorMock);
+
+    when(rocksDBIteratorMock.isValid()).thenReturn(true);
+    when(rocksDBIteratorMock.key()).thenReturn(testPrefix);
+    assertTrue(iter.hasNext());
+    verify(rocksDBIteratorMock, times(1)).isValid();
+    verify(rocksDBIteratorMock, times(1)).key();
+
+    try {
+      iter.seekToLast();
+      fail("Prefixed iterator does not support seekToLast");
+    } catch (Exception e) {
+      assertTrue(e instanceof UnsupportedOperationException);
+    }
+
+    iter.close();
+  }
 }
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
index 0f1858b..387c299 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
@@ -24,13 +24,16 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hdds.StringUtils;
 
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -54,6 +57,10 @@ public class TestRDBTableStore {
           "Fourth", "Fifth",
           "Sixth", "Seventh",
           "Eighth");
+  private final List<String> prefixedFamilies = Arrays.asList(
+      "PrefixFirst"
+  );
+  private static final int PREFIX_LENGTH = 8;
   @Rule
   public TemporaryFolder folder = new TemporaryFolder();
   private RDBStore rdbStore = null;
@@ -84,6 +91,13 @@ public class TestRDBTableStore {
       TableConfig newConfig = new TableConfig(name, new ColumnFamilyOptions());
       configSet.add(newConfig);
     }
+    for (String name : prefixedFamilies) {
+      ColumnFamilyOptions cfOptions = new ColumnFamilyOptions();
+      cfOptions.useFixedLengthPrefixExtractor(PREFIX_LENGTH);
+
+      TableConfig newConfig = new TableConfig(name, cfOptions);
+      configSet.add(newConfig);
+    }
     rdbStore = new RDBStore(folder.newFolder(), options, configSet);
   }
 
@@ -425,4 +439,123 @@ public class TestRDBTableStore {
       testTable.put(key, value);
     }
   }
+
+  @Test
+  public void testPrefixedIterator() throws Exception {
+    int containerCount = 3;
+    int blockCount = 5;
+    List<String> testPrefixes = generatePrefixes(containerCount);
+    List<Map<String, String>> testData = generateKVs(testPrefixes, blockCount);
+
+    try (Table<byte[], byte[]> testTable = rdbStore.getTable("PrefixFirst")) {
+      // write data
+      populatePrefixedTable(testTable, testData);
+
+      // iterator should seek to right pos in the middle
+      byte[] samplePrefix = testPrefixes.get(2).getBytes(
+          StandardCharsets.UTF_8);
+      try (TableIterator<byte[],
+          ? extends Table.KeyValue<byte[], byte[]>> iter = testTable.iterator(
+              samplePrefix)) {
+        int keyCount = 0;
+        while (iter.hasNext()) {
+          // iterator should only meet keys with samplePrefix
+          Assert.assertTrue(Arrays.equals(
+              Arrays.copyOf(iter.next().getKey(), PREFIX_LENGTH),
+              samplePrefix));
+          keyCount++;
+        }
+
+        // iterator should end at right pos
+        Assert.assertEquals(blockCount, keyCount);
+
+        // iterator should be able to seekToFirst
+        iter.seekToFirst();
+        Assert.assertTrue(iter.hasNext());
+        Assert.assertTrue(Arrays.equals(
+            Arrays.copyOf(iter.next().getKey(), PREFIX_LENGTH),
+            samplePrefix));
+      }
+    }
+  }
+
+  @Test
+  public void testPrefixedRangeKVs() throws Exception {
+    int containerCount = 3;
+    int blockCount = 5;
+    List<String> testPrefixes = generatePrefixes(containerCount);
+    List<Map<String, String>> testData = generateKVs(testPrefixes, blockCount);
+
+    try (Table<byte[], byte[]> testTable = rdbStore.getTable("PrefixFirst")) {
+
+      // write data
+      populatePrefixedTable(testTable, testData);
+
+      byte[] samplePrefix = testPrefixes.get(2).getBytes(
+          StandardCharsets.UTF_8);
+
+      // test start at first
+      byte[] startKey = samplePrefix;
+      List<? extends Table.KeyValue<byte[], byte[]>> rangeKVs = testTable
+          .getRangeKVs(startKey, 3, samplePrefix);
+      Assert.assertEquals(3, rangeKVs.size());
+
+      // test start with a middle key
+      startKey = StringUtils.string2Bytes(
+          StringUtils.bytes2String(samplePrefix) + "3");
+      rangeKVs = testTable.getRangeKVs(startKey, blockCount, samplePrefix);
+      Assert.assertEquals(2, rangeKVs.size());
+
+      // test with a filter
+      MetadataKeyFilters.KeyPrefixFilter filter1 = new MetadataKeyFilters
+          .KeyPrefixFilter()
+          .addFilter(StringUtils.bytes2String(samplePrefix) + "1");
+      startKey = StringUtils.string2Bytes(
+          StringUtils.bytes2String(samplePrefix));
+      rangeKVs = testTable.getRangeKVs(startKey, blockCount,
+          samplePrefix, filter1);
+      Assert.assertEquals(1, rangeKVs.size());
+
+      // test start with a non-exist key
+      startKey = StringUtils.string2Bytes(
+          StringUtils.bytes2String(samplePrefix) + 123);
+      rangeKVs = testTable.getRangeKVs(startKey, 10, samplePrefix);
+      Assert.assertEquals(0, rangeKVs.size());
+    }
+  }
+
+  private List<String> generatePrefixes(int prefixCount) {
+    List<String> prefixes = new ArrayList<>();
+    for (int i = 0; i < prefixCount; i++) {
+      // use alphabetic chars so we get fixed length prefix when
+      // convert to byte[]
+      prefixes.add(RandomStringUtils.randomAlphabetic(PREFIX_LENGTH));
+    }
+    return prefixes;
+  }
+
+  private List<Map<String, String>> generateKVs(List<String> prefixes,
+      int keyCount) {
+    List<Map<String, String>> data = new ArrayList<>();
+    for (String prefix : prefixes) {
+      Map<String, String> kvs = new HashMap<>();
+      for (int i = 0; i < keyCount; i++) {
+        String key = prefix + i;
+        String val = RandomStringUtils.random(10);
+        kvs.put(key, val);
+      }
+      data.add(kvs);
+    }
+    return data;
+  }
+
+  private void populatePrefixedTable(Table<byte[], byte[]> table,
+      List<Map<String, String>> testData) throws IOException {
+    for (Map<String, String> segment : testData) {
+      for (Map.Entry<String, String> entry : segment.entrySet()) {
+        table.put(entry.getKey().getBytes(StandardCharsets.UTF_8),
+            entry.getValue().getBytes(StandardCharsets.UTF_8));
+      }
+    }
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
index d199c41..680bf43 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
@@ -265,7 +265,7 @@ public final class SCMCertStore implements CertificateStore {
     } else {
       List<? extends Table.KeyValue<BigInteger, CertInfo>> certs =
           scmMetadataStore.getRevokedCertsV2Table().getRangeKVs(
-          startSerialID, count);
+          startSerialID, count, null);
 
       for (Table.KeyValue<BigInteger, CertInfo> kv : certs) {
         try {
@@ -290,10 +290,10 @@ public final class SCMCertStore implements CertificateStore {
 
     if (role == SCM) {
       return scmMetadataStore.getValidSCMCertsTable().getRangeKVs(
-          startSerialID, count);
+          startSerialID, count, null);
     } else {
       return scmMetadataStore.getValidCertsTable().getRangeKVs(
-          startSerialID, count);
+          startSerialID, count, null);
     }
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
index 992bf40..6b00312 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
@@ -107,7 +107,8 @@ public class TestStorageContainerManagerHelper {
 
       List<? extends Table.KeyValue<String, BlockData>> kvs =
           db.getStore().getBlockDataTable()
-              .getRangeKVs(cData.startKeyEmpty(), Integer.MAX_VALUE, filter);
+              .getRangeKVs(cData.startKeyEmpty(), Integer.MAX_VALUE,
+                  cData.containerPrefix(), filter);
 
       for (Table.KeyValue<String, BlockData> entry : kvs) {
         pendingDeletionBlocks
@@ -134,7 +135,7 @@ public class TestStorageContainerManagerHelper {
       List<? extends Table.KeyValue<String, BlockData>> kvs =
           db.getStore().getBlockDataTable()
               .getRangeKVs(cData.startKeyEmpty(), Integer.MAX_VALUE,
-                  cData.getUnprefixedKeyFilter());
+                  cData.containerPrefix(), cData.getUnprefixedKeyFilter());
 
       for (Table.KeyValue<String, BlockData> entry : kvs) {
         allBlocks.add(Long.valueOf(entry.getKey()));
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/PrefixParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/PrefixParser.java
index 0ebc832..db43cc7 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/PrefixParser.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/PrefixParser.java
@@ -217,7 +217,7 @@ public class PrefixParser implements Callable<Void>, SubcommandWithParent {
 
     List<? extends KeyValue
         <String, ? extends WithParentObjectId>> infoList =
-        table.getRangeKVs(null, 1000, filter);
+        table.getRangeKVs(null, 1000, null, filter);
 
     for (KeyValue<String, ? extends WithParentObjectId> info :infoList) {
       Path key = Paths.get(info.getKey());

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org