You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2020/09/01 21:58:00 UTC

[GitHub] [hadoop-ozone] hanishakoneru commented on a change in pull request #1298: HDDS-3869. Use different column families for datanode block and metadata

hanishakoneru commented on a change in pull request #1298:
URL: https://github.com/apache/hadoop-ozone/pull/1298#discussion_r476042841



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
##########
@@ -280,6 +280,9 @@ public Object construct(Node node) {
         String state = (String) nodes.get(OzoneConsts.STATE);
         kvData
             .setState(ContainerProtos.ContainerDataProto.State.valueOf(state));
+        String schemaVersion = (String) nodes.get(OzoneConsts.SCHEMA_VERSION);
+        kvData.setSchemaVersion(schemaVersion);

Review comment:
       When reading old containerDataYaml which does not container the Schema version field, what value would be returned? IIRC and it returns null, then we should set it to version V1.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
##########
@@ -159,122 +178,126 @@ public static void parseKVContainerData(KeyValueContainerData kvContainerData,
     }
     kvContainerData.setDbFile(dbFile);
 
+    if (kvContainerData.getSchemaVersion() == null) {
+      // If this container has not specified a schema version, it is in the old
+      // format with one default column family.
+      kvContainerData.setSchemaVersion(OzoneConsts.SCHEMA_V1);
+    }
+
 
     boolean isBlockMetadataSet = false;
 
     try(ReferenceCountedDB containerDB = BlockUtils.getDB(kvContainerData,
         config)) {
 
+      Table<String, Long> metadataTable =
+              containerDB.getStore().getMetadataTable();
+
       // Set pending deleted block count.
-      byte[] pendingDeleteBlockCount =
-          containerDB.getStore().get(DB_PENDING_DELETE_BLOCK_COUNT_KEY);
+      Long pendingDeleteBlockCount =
+          metadataTable.get(OzoneConsts.PENDING_DELETE_BLOCK_COUNT);
       if (pendingDeleteBlockCount != null) {
         kvContainerData.incrPendingDeletionBlocks(
-            Longs.fromByteArray(pendingDeleteBlockCount));
+                pendingDeleteBlockCount.intValue());

Review comment:
       Any reason for using intValue here instead of the long value as incrPendingDeletionBlocks takes in a long parameter?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
##########
@@ -262,14 +264,17 @@ public void deleteBlock(Container container, BlockID blockID) throws
       getBlockByID(db, blockID);
 
       // Update DB to delete block and set block count and bytes used.
-      BatchOperation batch = new BatchOperation();
-      batch.delete(blockKey);

Review comment:
       blockKey variable is redundant now and can be removed.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
##########
@@ -487,6 +486,7 @@ public void importContainerData(InputStream input,
       containerData.setState(originalContainerData.getState());
       containerData
           .setContainerDBType(originalContainerData.getContainerDBType());
+      containerData.setSchemaVersion(originalContainerData.getSchemaVersion());

Review comment:
       I see that schema version is being set in KeyValueContainerUtil#parseKVContainerData. 
   We can explore the option of setting the default schema version (V1) while reading the Yaml itself so that it is never missed. 

##########
File path: hadoop-hdds/container-service/src/test/resources/123-dn-container.db/LOG
##########
@@ -0,0 +1,284 @@
+2020/08/03-15:13:40.359520 7f80eb7a9700 RocksDB version: 6.8.1
+2020/08/03-15:13:40.359563 7f80eb7a9700 Git sha rocksdb_build_git_sha:
+2020/08/03-15:13:40.359566 7f80eb7a9700 Compile date Apr 26 2020

Review comment:
       LOCK and LOG files are not required to load the  DB.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
##########
@@ -159,122 +178,126 @@ public static void parseKVContainerData(KeyValueContainerData kvContainerData,
     }
     kvContainerData.setDbFile(dbFile);
 
+    if (kvContainerData.getSchemaVersion() == null) {
+      // If this container has not specified a schema version, it is in the old
+      // format with one default column family.
+      kvContainerData.setSchemaVersion(OzoneConsts.SCHEMA_V1);
+    }
+
 
     boolean isBlockMetadataSet = false;
 
     try(ReferenceCountedDB containerDB = BlockUtils.getDB(kvContainerData,
         config)) {
 
+      Table<String, Long> metadataTable =
+              containerDB.getStore().getMetadataTable();
+
       // Set pending deleted block count.
-      byte[] pendingDeleteBlockCount =
-          containerDB.getStore().get(DB_PENDING_DELETE_BLOCK_COUNT_KEY);
+      Long pendingDeleteBlockCount =
+          metadataTable.get(OzoneConsts.PENDING_DELETE_BLOCK_COUNT);
       if (pendingDeleteBlockCount != null) {
         kvContainerData.incrPendingDeletionBlocks(
-            Longs.fromByteArray(pendingDeleteBlockCount));
+                pendingDeleteBlockCount.intValue());
       } else {
         // Set pending deleted block count.
         MetadataKeyFilters.KeyPrefixFilter filter =
-            new MetadataKeyFilters.KeyPrefixFilter()
-                .addFilter(OzoneConsts.DELETING_KEY_PREFIX);
+                MetadataKeyFilters.getDeletingKeyFilter();
         int numPendingDeletionBlocks =
-            containerDB.getStore().getSequentialRangeKVs(null,
-                Integer.MAX_VALUE, filter)
-                .size();
+            containerDB.getStore().getBlockDataTable()
+            .getSequentialRangeKVs(null, Integer.MAX_VALUE, filter)
+            .size();
         kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks);
       }
 
       // Set delete transaction id.
-      byte[] delTxnId =
-          containerDB.getStore().get(DB_CONTAINER_DELETE_TRANSACTION_KEY);
+      Long delTxnId =
+          metadataTable.get(OzoneConsts.DELETE_TRANSACTION_KEY);
       if (delTxnId != null) {
         kvContainerData
-            .updateDeleteTransactionId(Longs.fromByteArray(delTxnId));
+            .updateDeleteTransactionId(delTxnId);
       }
 
       // Set BlockCommitSequenceId.
-      byte[] bcsId = containerDB.getStore().get(
-          DB_BLOCK_COMMIT_SEQUENCE_ID_KEY);
+      Long bcsId = metadataTable.get(
+          OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID);
       if (bcsId != null) {
         kvContainerData
-            .updateBlockCommitSequenceId(Longs.fromByteArray(bcsId));
+            .updateBlockCommitSequenceId(bcsId);
       }
 
       // Set bytes used.
       // commitSpace for Open Containers relies on usedBytes
-      byte[] bytesUsed =
-          containerDB.getStore().get(DB_CONTAINER_BYTES_USED_KEY);
+      Long bytesUsed =
+          metadataTable.get(OzoneConsts.CONTAINER_BYTES_USED);
       if (bytesUsed != null) {
         isBlockMetadataSet = true;
-        kvContainerData.setBytesUsed(Longs.fromByteArray(bytesUsed));
+        kvContainerData.setBytesUsed(bytesUsed);
       }
 
       // Set block count.
-      byte[] blockCount = containerDB.getStore().get(DB_BLOCK_COUNT_KEY);
+      Long blockCount = metadataTable.get(OzoneConsts.BLOCK_COUNT);
       if (blockCount != null) {
         isBlockMetadataSet = true;
-        kvContainerData.setKeyCount(Longs.fromByteArray(blockCount));
+        kvContainerData.setKeyCount(blockCount);
       }
     }
 
     if (!isBlockMetadataSet) {
-      initializeUsedBytesAndBlockCount(kvContainerData);
+      initializeUsedBytesAndBlockCount(kvContainerData, config);
     }
   }
 
 
   /**
    * Initialize bytes used and block count.
-   * @param kvContainerData
+   * @param kvData
    * @throws IOException
    */
   private static void initializeUsedBytesAndBlockCount(
-      KeyValueContainerData kvContainerData) throws IOException {
-
-    MetadataKeyFilters.KeyPrefixFilter filter =
-            new MetadataKeyFilters.KeyPrefixFilter();
+      KeyValueContainerData kvData, ConfigurationSource config)
+          throws IOException {
 
-    // Ignore all blocks except those with no prefix, or those with
-    // #deleting# prefix.
-    filter.addFilter(OzoneConsts.DELETED_KEY_PREFIX, true)
-          .addFilter(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX, true)
-          .addFilter(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX, true)
-          .addFilter(OzoneConsts.BLOCK_COUNT, true)
-          .addFilter(OzoneConsts.CONTAINER_BYTES_USED, true)
-          .addFilter(OzoneConsts.PENDING_DELETE_BLOCK_COUNT, true);
+    final String errorMessage = "Failed to parse block data for" +
+            " Container " + kvData.getContainerID();
 
     long blockCount = 0;
-    try (KeyValueBlockIterator blockIter = new KeyValueBlockIterator(
-        kvContainerData.getContainerID(),
-        new File(kvContainerData.getContainerPath()), filter)) {
-      long usedBytes = 0;
-
-
-      boolean success = true;
-      while (success) {
-        try {
-          if (blockIter.hasNext()) {
-            BlockData block = blockIter.nextBlock();
-            long blockLen = 0;
-
-            List< ContainerProtos.ChunkInfo > chunkInfoList = block.getChunks();
-            for (ContainerProtos.ChunkInfo chunk : chunkInfoList) {
-              ChunkInfo info = ChunkInfo.getFromProtoBuf(chunk);
-              blockLen += info.getLen();
-            }
-
-            usedBytes += blockLen;
-            blockCount++;
-          } else {
-            success = false;
+    long usedBytes = 0;
+
+    try(ReferenceCountedDB db = BlockUtils.getDB(kvData, config)) {
+      // Count all regular blocks.
+      try (BlockIterator<BlockData> blockIter =
+                   db.getStore().getBlockIterator(
+                           MetadataKeyFilters.getUnprefixedKeyFilter())) {
+
+        while (blockIter.hasNext()) {
+          blockCount++;
+          try {
+            usedBytes += blockIter.nextBlock().getSize();

Review comment:
       The way usedBytes is calculated has been changed. I am not sure if there will be any implications if this calculation us wrong. Need to dig deeper. 
   We should probably separate this optimization into a separate Jira. What do you think?
   

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
##########
@@ -487,6 +486,7 @@ public void importContainerData(InputStream input,
       containerData.setState(originalContainerData.getState());
       containerData
           .setContainerDBType(originalContainerData.getContainerDBType());
+      containerData.setSchemaVersion(originalContainerData.getSchemaVersion());

Review comment:
       importContainerData() is called when replicating containers. If an old container needs to be replicated, it would not have the schema version. This can be fixed by setting a default value for schema version if it does not exist in ContainerDataYaml Constructor.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
##########
@@ -209,30 +208,35 @@ private void deleteKeyValueContainerBlocks(
     int newDeletionBlocks = 0;
     try(ReferenceCountedDB containerDB =
             BlockUtils.getDB(containerData, conf)) {
-      for (Long blk : delTX.getLocalIDList()) {
-        BatchOperation batch = new BatchOperation();
-        byte[] blkBytes = Longs.toByteArray(blk);
-        byte[] blkInfo = containerDB.getStore().get(blkBytes);
+      Table<String, BlockData> blockDataTable =
+              containerDB.getStore().getBlockDataTable();

Review comment:
       Is the BlockData table loaded in memory when the store is initialized?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
##########
@@ -115,11 +116,14 @@ protected boolean removeLRU(LinkEntry entry) {
    * @param containerID - ID of the container.
    * @param containerDBType - DB type of the container.
    * @param containerDBPath - DB path of the container.
+   * @param schemaVersion - Schema version of the container.
    * @param conf - Hadoop Configuration.
    * @return ReferenceCountedDB.
    */
   public ReferenceCountedDB getDB(long containerID, String containerDBType,
-                             String containerDBPath, ConfigurationSource conf)
+                                  String containerDBPath,

Review comment:
       @arp7, is it ok to remove LevelDB support? 
   The option to configure DNs to use LevelDB was removed in 0.6.0. And since upgrade from previous versions is not supported, I think it is safe to remove LevelDB support.
   
   If yes, we can open a new Jira to clean up the LevelDB code path. If no, we need to take care of that here. 
   
   The containerDBType parameter is redundant here if old LevelDB containers will not be supported.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
##########
@@ -324,14 +328,14 @@ public void shutdown() {
 
   private byte[] getBlockByID(ReferenceCountedDB db, BlockID blockID)
       throws IOException {
-    byte[] blockKey = Longs.toByteArray(blockID.getLocalID());
+    String blockKey = Long.toString(blockID.getLocalID());
 
-    byte[] blockData = db.getStore().get(blockKey);
+    BlockData blockData = db.getStore().getBlockDataTable().get(blockKey);
     if (blockData == null) {
       throw new StorageContainerException(NO_SUCH_BLOCK_ERR_MSG,
           NO_SUCH_BLOCK);
     }
 
-    return blockData;
+    return blockData.getProtoBufMessage().toByteArray();

Review comment:
       All the usages of getBlockByID convert the returned byte array back to ContainerProtos.BlockData. We can avoid this serialization-deserialization. 
   Noting it down here so that we can open a new Jira to optimize this.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.metadata;
+
+import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Wrapper class to represent a table in a datanode RocksDB instance.
+ * This class can wrap any {@link Table} instance, but will throw
+ * {@link UnsupportedOperationException} for {@link Table#iterator}.
+ * This is because differing schema versions used in datanode DB layouts may
+ * have differing underlying table structures, so iterating a table instance
+ * directly, without taking into account key prefixes, may yield unexpected
+ * results.
+ */
+public class DatanodeTable<KEY, VALUE> implements Table<KEY, VALUE> {
+
+  private final Table<KEY, VALUE> table;
+
+  public DatanodeTable(Table<KEY, VALUE> table) {
+    this.table = table;
+  }
+
+  @Override
+  public void put(KEY key, VALUE value) throws IOException {
+    table.put(key, value);
+  }
+
+  @Override
+  public void putWithBatch(BatchOperation batch, KEY key,
+                           VALUE value) throws IOException {
+    table.putWithBatch(batch, key, value);
+  }
+
+  @Override
+  public boolean isEmpty() throws IOException {
+    return table.isEmpty();
+  }
+
+  @Override
+  public void delete(KEY key) throws IOException {
+    table.delete(key);
+  }
+
+  @Override
+  public void deleteWithBatch(BatchOperation batch, KEY key)
+          throws IOException {
+    table.deleteWithBatch(batch, key);
+  }
+
+  @Override
+  public final TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator() {
+    throw new UnsupportedOperationException("Iterating tables directly is not" +
+            " supported for datanode containers due to differing schema " +
+            "version.");
+  }
+
+  @Override
+  public String getName() throws IOException {
+    return table.getName();
+  }
+
+  @Override
+  public long getEstimatedKeyCount() throws IOException {
+    return table.getEstimatedKeyCount();
+  }
+
+  @Override
+  public void addCacheEntry(CacheKey<KEY> cacheKey,
+                            CacheValue<VALUE> cacheValue) {

Review comment:
       I think we don't need cache functionality for DatanodeTable? I see it being used only in OM.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.metadata;
+
+import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * For RocksDB instances written using DB schema version 1, all data is
+ * stored in the default column family. This differs from later schema
+ * versions, which put deleted blocks in a different column family.
+ * As a result, the block IDs used as keys for deleted blocks must be
+ * prefixed in schema version 1 so that they can be differentiated from
+ * regular blocks. However, these prefixes are not necessary in later schema
+ * versions, because the deleted blocks and regular blocks are in different
+ * column families.
+ * <p>
+ * Since clients must operate independently of the underlying schema version,
+ * This class is returned to clients using {@link DatanodeStoreSchemaOneImpl}
+ * instances, allowing them to access keys as if no prefix is
+ * required, while it adds the prefix when necessary.
+ * This means the client should omit the deleted prefix when putting and
+ * getting keys, regardless of the schema version.
+ * <p>
+ * Note that this class will only apply prefixes to keys as parameters,
+ * never as return types. This means that keys returned through iterators
+ * like {@link SchemaOneDeletedBlocksTable#getSequentialRangeKVs}, and
+ * {@link SchemaOneDeletedBlocksTable#getRangeKVs} will return keys prefixed
+ * with {@link SchemaOneDeletedBlocksTable#DELETED_KEY_PREFIX}.
+ */

Review comment:
       This would mean someone iterating through all the deleted blocks in a DN might get a mixture of keys with and without the deleted key prefix. 
   Is there such a scenario in the code currently?

##########
File path: hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestContainerCache.java
##########
@@ -85,17 +86,17 @@ public void testContainerCacheEviction() throws Exception {
 
     // Get 2 references out of the same db and verify the objects are same.
     ReferenceCountedDB db1 = cache.getDB(1, "RocksDB",
-        containerDir1.getPath(), conf);
+            containerDir1.getPath(), OzoneConsts.SCHEMA_LATEST, conf);

Review comment:
       Nitpick: It would be good to have a method in ContainerCache which calls the current getDB with Schema_Latest. We can avoid specifying the schema version every time then.
   ```
   public ReferenceCountedDB getDB(long containerID, String containerDBType, String containerDBPath,
       ConfigurationSource conf) {
       getDB(containerID, containerDBType, OzoneConsts.SCHEMA_LATEST, conf);
   }
   ```

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.metadata;
+
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
+import org.apache.hadoop.hdds.utils.db.LongCodec;
+import org.apache.hadoop.hdds.utils.db.StringCodec;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
+
+/**
+ * This class defines the RocksDB structure for datanodes following schema
+ * version 2, where the block data, metadata, and deleted block ids are put in
+ * their own separate column families.
+ */
+public class DatanodeSchemaTwoDBDefinition extends
+        AbstractDatanodeDBDefinition {
+
+  public static final DBColumnFamilyDefinition<String, BlockData>
+          BLOCK_DATA =
+          new DBColumnFamilyDefinition<>(
+                  "block_data",

Review comment:
       Can we define the table names as static final fields either here or in OzoneConsts?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.metadata;
+
+import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Wrapper class to represent a table in a datanode RocksDB instance.
+ * This class can wrap any {@link Table} instance, but will throw
+ * {@link UnsupportedOperationException} for {@link Table#iterator}.
+ * This is because differing schema versions used in datanode DB layouts may
+ * have differing underlying table structures, so iterating a table instance
+ * directly, without taking into account key prefixes, may yield unexpected
+ * results.
+ */
+public class DatanodeTable<KEY, VALUE> implements Table<KEY, VALUE> {
+
+  private final Table<KEY, VALUE> table;
+

Review comment:
       From what I could understand, the underlying table structure is TypedTabled. Is that correct?
   If yes, should DatanodeTable extend TypedTable instead of Table<KEY, VALUE> so that it does not have to override all the methods?

##########
File path: hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
##########
@@ -471,4 +470,59 @@ public void testBlockThrottle() throws Exception {
       service.shutdown();
     }
   }
+
+  @Test
+  public void testDeletedChunkInfo() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
+    conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
+    ContainerSet containerSet = new ContainerSet();
+    createToDeleteBlocks(containerSet, conf, 1, 2, 3);
+
+    List<ContainerData> containerData = Lists.newArrayList();
+    containerSet.listContainer(0L, 1, containerData);
+
+    try(ReferenceCountedDB meta = BlockUtils.getDB(
+            (KeyValueContainerData) containerData.get(0), conf)) {
+
+      // Collect all ChunkInfo from blocks marked for deletion.
+      List<? extends Table.KeyValue<String, BlockData>> deletingBlocks =
+              meta.getStore().getBlockDataTable()
+              .getRangeKVs(null, 100,
+                      MetadataKeyFilters.getDeletingKeyFilter());
+
+      // Delete all blocks marked for deletion.
+      BlockDeletingServiceTestImpl svc =
+              getBlockDeletingService(containerSet, conf);
+      svc.start();
+      GenericTestUtils.waitFor(svc::isStarted, 100, 3000);
+      deleteAndWait(svc, 1);
+      svc.shutdown();
+
+      // Get deleted blocks from their table, and check their ChunkInfo lists
+      // against those we saved for them before deletion.
+      List<? extends Table.KeyValue<String, ChunkInfoList>> deletedBlocks =
+              meta.getStore().getDeletedBlocksTable()
+              .getRangeKVs(null, 100);
+
+      Assert.assertEquals(deletingBlocks.size(), deletedBlocks.size());
+
+      Iterator<? extends Table.KeyValue<String, BlockData>>
+              deletingBlocksIter = deletingBlocks.iterator();
+      Iterator<? extends Table.KeyValue<String, ChunkInfoList>>
+              deletedBlocksIter = deletedBlocks.iterator();
+
+      while(deletingBlocksIter.hasNext() && deletedBlocksIter.hasNext())  {
+        List<ContainerProtos.ChunkInfo> deletingChunks =
+                deletingBlocksIter.next().getValue().getChunks();
+        List<ContainerProtos.ChunkInfo> deletedChunks =
+                deletedBlocksIter.next().getValue().asList();
+

Review comment:
       This test is dependent on the order of deleted blocks. Does RocksDB iterator ensure that the deleting and deleted block keys are returned in the same sorted order?

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneChunkInfoListCodec.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.metadata;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+import java.io.IOException;
+
+/**
+ * Codec for parsing {@link ContainerProtos.ChunkInfoList} objects from data
+ * that may have been written using schema version one. Before upgrading
+ * schema versions, deleted block IDs were stored with a duplicate copy of
+ * their ID as the value in the database. After upgrading the code, any
+ * deletes that happen on the DB will save the chunk information with the
+ * deleted blocks instead, even if those deletes are performed on a database
+ * created with schema version one.
+ * <p>

Review comment:
       Shouldn't Schema 1 DBs store the deleted blocks in the old format?
   IIUC, if SchemaOneChunkInfoListCodec implements the Block ID codec, then the InvalidProtocolBufferException can be avoided. 
   Please let me know if there is any other reason for implementing it this way.
   

##########
File path: hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
##########
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.testutils.BlockDeletingServiceTestImpl;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.*;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests processing of containers written with DB schema version 1,
+ * which stores all its data in the default RocksDB column family.
+ * Newer schema version will use a different column family layout, but they
+ * should still be able to read, delete data, and update metadata for schema
+ * version 1 containers.
+ * <p>
+ * The functionality executed by these tests assumes that all containers will
+ * have to be closed before an upgrade, meaning that containers written with
+ * schema version 1 will only ever be encountered in their closed state.
+ * <p>
+ * For an example of a RocksDB instance written with schema version 1, see
+ * {@link TestDB}, which is used by these tests to load a pre created schema
+ * version 1 RocksDB instance from test resources.
+ */
+public class TestSchemaOneBackwardsCompatibility {
+  private OzoneConfiguration conf;
+
+  private File metadataDir;
+  private File dbFile;
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Before
+  public void setup() throws Exception {
+    conf = new OzoneConfiguration();
+    TestDB testDB = new TestDB();
+
+    // Copy data to the temporary folder so it can be safely modified.
+    File tempMetadataDir =
+            tempFolder.newFolder(Long.toString(TestDB.CONTAINER_ID),
+                    OzoneConsts.CONTAINER_META_PATH);
+
+    FileUtils.copyDirectoryToDirectory(testDB.getDBDirectory(),
+            tempMetadataDir);
+    FileUtils.copyFileToDirectory(testDB.getContainerFile(), tempMetadataDir);
+
+    metadataDir = tempMetadataDir;
+    File[] potentialDBFiles = metadataDir.listFiles((dir, name) ->
+            name.equals(TestDB.DB_NAME));
+
+    if (potentialDBFiles == null || potentialDBFiles.length != 1) {
+      throw new IOException("Failed load file named " + TestDB.DB_NAME + " " +
+              "from the metadata directory " + metadataDir.getAbsolutePath());
+    }
+
+    dbFile = potentialDBFiles[0];
+  }
+
+  /**
+   * Because all tables in schema version one map back to the default table,
+   * directly iterating any of the table instances should be forbidden.
+   * Otherwise, the iterators for each table would read the entire default
+   * table, return all database contents, and yield unexpected results.
+   * @throws Exception
+   */
+  @Test
+  public void testDirectTableIterationDisabled() throws Exception {
+    KeyValueContainerData kvData = newKvData();
+    KeyValueContainerUtil.parseKVContainerData(kvData, conf);
+
+    try(ReferenceCountedDB refCountedDB = BlockUtils.getDB(kvData, conf)) {
+      DatanodeStore store = refCountedDB.getStore();
+
+      assertTableIteratorUnsupported(store.getMetadataTable());
+      assertTableIteratorUnsupported(store.getBlockDataTable());
+      assertTableIteratorUnsupported(store.getDeletedBlocksTable());
+    }
+  }
+
+  private void assertTableIteratorUnsupported(Table<?, ?> table) {
+    try {
+      table.iterator();
+      Assert.fail("Table iterator should have thrown " +
+              "UnsupportedOperationException.");
+    } catch (UnsupportedOperationException ex) {
+      // Exception thrown as expected.
+    }
+  }
+
+  /**
+   * Counts the number of deleted, pending delete, and regular blocks in the
+   * database, and checks that they match the expected values.
+   * @throws IOException
+   */
+  @Test
+  public void testBlockIteration() throws IOException {
+    KeyValueContainerData kvData = newKvData();
+    KeyValueContainerUtil.parseKVContainerData(kvData, conf);
+
+    try(ReferenceCountedDB refCountedDB = BlockUtils.getDB(kvData, conf)) {
+      assertEquals(TestDB.NUM_DELETED_BLOCKS, countDeletedBlocks(refCountedDB));
+
+      assertEquals(TestDB.NUM_PENDING_DELETION_BLOCKS,
+              countDeletingBlocks(refCountedDB));
+
+      assertEquals(TestDB.KEY_COUNT - TestDB.NUM_PENDING_DELETION_BLOCKS,
+              countUnprefixedBlocks(refCountedDB));
+    }
+  }
+
+  /**
+   * Tests reading of a container that was written in schema version 1, when
+   * the container has metadata keys present.
+   * The {@link KeyValueContainerUtil} will read these values to fill in a
+   * {@link KeyValueContainerData} object.
+   * @throws Exception
+   */
+  @Test
+  public void testReadWithMetadata() throws Exception {
+    KeyValueContainerData kvData = newKvData();
+    KeyValueContainerUtil.parseKVContainerData(kvData, conf);
+    checkContainerData(kvData);
+  }
+
+  /**
+   * Tests reading of a container that was written in schema version 1, when
+   * the container has no metadata keys present.
+   * The {@link KeyValueContainerUtil} will scan the blocks in the database
+   * to fill these metadata values into the database and into a
+   * {@link KeyValueContainerData} object.
+   * @throws Exception
+   */
+  @Test
+  public void testReadWithoutMetadata() throws Exception {
+    // Init the kvData enough values so we can get the database to modify for
+    // testing and then read.
+    KeyValueContainerData kvData = newKvData();
+    KeyValueContainerUtil.parseKVContainerData(kvData, conf);
+
+    // Delete metadata keys from our copy of the DB.
+    // This simulates them not being there to start with.
+    try (ReferenceCountedDB db = BlockUtils.getDB(kvData, conf)) {
+      Table<String, Long> metadataTable = db.getStore().getMetadataTable();
+
+      metadataTable.delete(OzoneConsts.BLOCK_COUNT);
+      assertNull(metadataTable.get(OzoneConsts.BLOCK_COUNT));
+
+      metadataTable.delete(OzoneConsts.CONTAINER_BYTES_USED);
+      assertNull(metadataTable.get(OzoneConsts.CONTAINER_BYTES_USED));
+
+      metadataTable.delete(OzoneConsts.PENDING_DELETE_BLOCK_COUNT);
+      assertNull(metadataTable.get(OzoneConsts.PENDING_DELETE_BLOCK_COUNT));
+    }
+
+    // Create a new container data object, and fill in its metadata by
+    // counting blocks from the database, since the metadata keys in the
+    // database are now gone.
+    kvData = newKvData();
+    KeyValueContainerUtil.parseKVContainerData(kvData, conf);
+    checkContainerData(kvData);
+  }
+
+  /**
+   * Tests reading blocks marked for deletion from a container written in
+   * schema version 1. Because the block deleting service both reads for
+   * deleted blocks and deletes them, this test will modify its copy of the
+   * database.
+   */
+  @Test
+  public void testDelete() throws Exception {
+    final long numBlocksToDelete = TestDB.NUM_PENDING_DELETION_BLOCKS;
+
+    runBlockDeletingService();
+
+    // Expected values after blocks with #deleting# prefix in original DB are
+    // deleted.
+    final long expectedDeletingBlocks =
+            TestDB.NUM_PENDING_DELETION_BLOCKS - numBlocksToDelete;
+    final long expectedDeletedBlocks =
+            TestDB.NUM_DELETED_BLOCKS + numBlocksToDelete;
+    final long expectedRegularBlocks =
+            TestDB.KEY_COUNT - numBlocksToDelete;
+
+    try(ReferenceCountedDB refCountedDB = BlockUtils.getDB(newKvData(), conf)) {
+      // Test results via block iteration.
+      assertEquals(expectedDeletingBlocks,
+              countDeletingBlocks(refCountedDB));
+      assertEquals(expectedDeletedBlocks,
+              countDeletedBlocks(refCountedDB));
+      assertEquals(expectedRegularBlocks,
+              countUnprefixedBlocks(refCountedDB));
+
+      // Test table metadata.
+      Table<String, Long> metadataTable =
+              refCountedDB.getStore().getMetadataTable();
+      assertEquals(expectedRegularBlocks + expectedDeletingBlocks,
+              (long)metadataTable.get(OzoneConsts.BLOCK_COUNT));
+      assertEquals(TestDB.BYTES_USED,
+              (long)metadataTable.get(OzoneConsts.CONTAINER_BYTES_USED));
+    }
+  }
+
+  /**
+   * Tests reading the chunk info saved from a block that was deleted from a
+   * database in schema version one. Blocks deleted from schema version one
+   * before the upgrade will have the block ID saved as their value. Trying
+   * to retrieve this value as a {@link ChunkInfoList} should fail. Blocks
+   * deleted from schema version one after the upgrade should have their
+   * {@link ChunkInfoList} saved as the corresponding value in the deleted
+   * blocks table. Reading these values should succeed.
+   * @throws Exception
+   */
+  @Test
+  public void testReadDeletedBlockChunkInfo() throws Exception {
+    KeyValueContainerData kvData = newKvData();
+    KeyValueContainerUtil.parseKVContainerData(kvData, conf);
+
+    try(ReferenceCountedDB refCountedDB = BlockUtils.getDB(newKvData(), conf)) {
+      // Read blocks that were already deleted before the upgrade.
+      List<? extends Table.KeyValue<String, ChunkInfoList>> deletedBlocks =
+              refCountedDB.getStore()
+                      .getDeletedBlocksTable().getRangeKVs(null, 100);
+
+      Set<String> preUpgradeBlocks = new HashSet<>();
+
+      for(Table.KeyValue<String, ChunkInfoList> chunkListKV: deletedBlocks) {
+        preUpgradeBlocks.add(chunkListKV.getKey());
+        try {
+          chunkListKV.getValue();
+          Assert.fail("No exception thrown when trying to retrieve old " +
+                  "deleted blocks values as chunk lists.");
+        } catch(IOException ex) {
+          // Exception thrown as expected.
+        }
+      }
+
+      Assert.assertEquals(TestDB.NUM_DELETED_BLOCKS, preUpgradeBlocks.size());
+
+      runBlockDeletingService();
+
+      // After the block deleting service runs, get the updated list of
+      // deleted blocks.
+      deletedBlocks = refCountedDB.getStore()
+                      .getDeletedBlocksTable().getRangeKVs(null, 100);
+
+      int numPostUpgradeDeletesFound = 0;
+      for(Table.KeyValue<String, ChunkInfoList> chunkListKV: deletedBlocks) {
+        if (!preUpgradeBlocks.contains(chunkListKV.getKey())) {
+          numPostUpgradeDeletesFound++;
+          Assert.assertNotNull(chunkListKV.getValue());
+        }
+      }
+
+      // The blocks that were originally marked for deletion should now be
+      // deleted.
+      Assert.assertEquals(TestDB.NUM_PENDING_DELETION_BLOCKS,
+              numPostUpgradeDeletesFound);
+    }
+
+
+    try(ReferenceCountedDB refCountedDB = BlockUtils.getDB(newKvData(), conf)) {
+      List<? extends Table.KeyValue<String, ChunkInfoList>> deletedBlocks =
+              refCountedDB.getStore().getDeletedBlocksTable()
+                      .getRangeKVs(null, 100);

Review comment:
       Unused code block.

##########
File path: hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
##########
@@ -88,196 +86,314 @@ public void setUp() throws Exception {
     conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
     volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf);
+
+    containerData = new KeyValueContainerData(105L,
+            layout,
+            (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
+            UUID.randomUUID().toString());
+    // Init the container.
+    container = new KeyValueContainer(containerData, conf);
+    container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), UUID
+            .randomUUID().toString());
+    db = BlockUtils.getDB(containerData, conf);
   }
 
 
   @After
-  public void tearDown() {
+  public void tearDown() throws Exception {
+    db.close();
+    db.cleanup();
     volumeSet.shutdown();
     FileUtil.fullyDelete(testRoot);
   }
 
   @Test
   public void testKeyValueBlockIteratorWithMixedBlocks() throws Exception {
-
-    long containerID = 100L;
-    int deletedBlocks = 5;
+    int deletingBlocks = 5;
     int normalBlocks = 5;
-    createContainerWithBlocks(containerID, normalBlocks, deletedBlocks);
-    String containerPath = new File(containerData.getMetadataPath())
-        .getParent();
-    try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerID, new File(containerPath))) {
+    Map<String, List<Long>> blockIDs = createContainerWithBlocks(CONTAINER_ID,
+            normalBlocks,
+            deletingBlocks);

Review comment:
       The general continuation indent used throughout the code is 4. 
   
   @elek, any idea if continuation indent should be 4 or does it not matter? Checkstyle CI seems to not care about it.

##########
File path: hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
##########
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.testutils.BlockDeletingServiceTestImpl;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.*;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests processing of containers written with DB schema version 1,
+ * which stores all its data in the default RocksDB column family.
+ * Newer schema version will use a different column family layout, but they
+ * should still be able to read, delete data, and update metadata for schema
+ * version 1 containers.
+ * <p>
+ * The functionality executed by these tests assumes that all containers will
+ * have to be closed before an upgrade, meaning that containers written with
+ * schema version 1 will only ever be encountered in their closed state.
+ * <p>
+ * For an example of a RocksDB instance written with schema version 1, see
+ * {@link TestDB}, which is used by these tests to load a pre created schema
+ * version 1 RocksDB instance from test resources.
+ */
+public class TestSchemaOneBackwardsCompatibility {
+  private OzoneConfiguration conf;
+
+  private File metadataDir;
+  private File dbFile;
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Before
+  public void setup() throws Exception {
+    conf = new OzoneConfiguration();
+    TestDB testDB = new TestDB();
+
+    // Copy data to the temporary folder so it can be safely modified.
+    File tempMetadataDir =
+            tempFolder.newFolder(Long.toString(TestDB.CONTAINER_ID),
+                    OzoneConsts.CONTAINER_META_PATH);
+
+    FileUtils.copyDirectoryToDirectory(testDB.getDBDirectory(),
+            tempMetadataDir);
+    FileUtils.copyFileToDirectory(testDB.getContainerFile(), tempMetadataDir);
+
+    metadataDir = tempMetadataDir;
+    File[] potentialDBFiles = metadataDir.listFiles((dir, name) ->
+            name.equals(TestDB.DB_NAME));
+
+    if (potentialDBFiles == null || potentialDBFiles.length != 1) {
+      throw new IOException("Failed load file named " + TestDB.DB_NAME + " " +
+              "from the metadata directory " + metadataDir.getAbsolutePath());
+    }
+
+    dbFile = potentialDBFiles[0];
+  }
+
+  /**
+   * Because all tables in schema version one map back to the default table,
+   * directly iterating any of the table instances should be forbidden.
+   * Otherwise, the iterators for each table would read the entire default
+   * table, return all database contents, and yield unexpected results.
+   * @throws Exception
+   */
+  @Test
+  public void testDirectTableIterationDisabled() throws Exception {
+    KeyValueContainerData kvData = newKvData();
+    KeyValueContainerUtil.parseKVContainerData(kvData, conf);
+
+    try(ReferenceCountedDB refCountedDB = BlockUtils.getDB(kvData, conf)) {
+      DatanodeStore store = refCountedDB.getStore();
+
+      assertTableIteratorUnsupported(store.getMetadataTable());
+      assertTableIteratorUnsupported(store.getBlockDataTable());
+      assertTableIteratorUnsupported(store.getDeletedBlocksTable());
+    }
+  }
+
+  private void assertTableIteratorUnsupported(Table<?, ?> table) {
+    try {
+      table.iterator();
+      Assert.fail("Table iterator should have thrown " +
+              "UnsupportedOperationException.");
+    } catch (UnsupportedOperationException ex) {
+      // Exception thrown as expected.
+    }
+  }
+
+  /**
+   * Counts the number of deleted, pending delete, and regular blocks in the
+   * database, and checks that they match the expected values.
+   * @throws IOException
+   */
+  @Test
+  public void testBlockIteration() throws IOException {
+    KeyValueContainerData kvData = newKvData();
+    KeyValueContainerUtil.parseKVContainerData(kvData, conf);
+
+    try(ReferenceCountedDB refCountedDB = BlockUtils.getDB(kvData, conf)) {
+      assertEquals(TestDB.NUM_DELETED_BLOCKS, countDeletedBlocks(refCountedDB));
+
+      assertEquals(TestDB.NUM_PENDING_DELETION_BLOCKS,
+              countDeletingBlocks(refCountedDB));
+
+      assertEquals(TestDB.KEY_COUNT - TestDB.NUM_PENDING_DELETION_BLOCKS,
+              countUnprefixedBlocks(refCountedDB));
+    }
+  }
+
+  /**
+   * Tests reading of a container that was written in schema version 1, when
+   * the container has metadata keys present.
+   * The {@link KeyValueContainerUtil} will read these values to fill in a
+   * {@link KeyValueContainerData} object.
+   * @throws Exception
+   */
+  @Test
+  public void testReadWithMetadata() throws Exception {
+    KeyValueContainerData kvData = newKvData();
+    KeyValueContainerUtil.parseKVContainerData(kvData, conf);
+    checkContainerData(kvData);
+  }
+
+  /**
+   * Tests reading of a container that was written in schema version 1, when
+   * the container has no metadata keys present.
+   * The {@link KeyValueContainerUtil} will scan the blocks in the database
+   * to fill these metadata values into the database and into a
+   * {@link KeyValueContainerData} object.
+   * @throws Exception
+   */
+  @Test
+  public void testReadWithoutMetadata() throws Exception {
+    // Init the kvData enough values so we can get the database to modify for
+    // testing and then read.
+    KeyValueContainerData kvData = newKvData();
+    KeyValueContainerUtil.parseKVContainerData(kvData, conf);
+
+    // Delete metadata keys from our copy of the DB.
+    // This simulates them not being there to start with.
+    try (ReferenceCountedDB db = BlockUtils.getDB(kvData, conf)) {
+      Table<String, Long> metadataTable = db.getStore().getMetadataTable();
+
+      metadataTable.delete(OzoneConsts.BLOCK_COUNT);
+      assertNull(metadataTable.get(OzoneConsts.BLOCK_COUNT));
+
+      metadataTable.delete(OzoneConsts.CONTAINER_BYTES_USED);
+      assertNull(metadataTable.get(OzoneConsts.CONTAINER_BYTES_USED));
+
+      metadataTable.delete(OzoneConsts.PENDING_DELETE_BLOCK_COUNT);
+      assertNull(metadataTable.get(OzoneConsts.PENDING_DELETE_BLOCK_COUNT));
+    }
+
+    // Create a new container data object, and fill in its metadata by
+    // counting blocks from the database, since the metadata keys in the
+    // database are now gone.
+    kvData = newKvData();
+    KeyValueContainerUtil.parseKVContainerData(kvData, conf);
+    checkContainerData(kvData);
+  }
+
+  /**
+   * Tests reading blocks marked for deletion from a container written in
+   * schema version 1. Because the block deleting service both reads for
+   * deleted blocks and deletes them, this test will modify its copy of the
+   * database.
+   */
+  @Test
+  public void testDelete() throws Exception {
+    final long numBlocksToDelete = TestDB.NUM_PENDING_DELETION_BLOCKS;
+
+    runBlockDeletingService();
+
+    // Expected values after blocks with #deleting# prefix in original DB are
+    // deleted.
+    final long expectedDeletingBlocks =
+            TestDB.NUM_PENDING_DELETION_BLOCKS - numBlocksToDelete;
+    final long expectedDeletedBlocks =
+            TestDB.NUM_DELETED_BLOCKS + numBlocksToDelete;
+    final long expectedRegularBlocks =
+            TestDB.KEY_COUNT - numBlocksToDelete;
+
+    try(ReferenceCountedDB refCountedDB = BlockUtils.getDB(newKvData(), conf)) {
+      // Test results via block iteration.
+      assertEquals(expectedDeletingBlocks,
+              countDeletingBlocks(refCountedDB));
+      assertEquals(expectedDeletedBlocks,
+              countDeletedBlocks(refCountedDB));
+      assertEquals(expectedRegularBlocks,
+              countUnprefixedBlocks(refCountedDB));
+
+      // Test table metadata.
+      Table<String, Long> metadataTable =
+              refCountedDB.getStore().getMetadataTable();
+      assertEquals(expectedRegularBlocks + expectedDeletingBlocks,
+              (long)metadataTable.get(OzoneConsts.BLOCK_COUNT));
+      assertEquals(TestDB.BYTES_USED,
+              (long)metadataTable.get(OzoneConsts.CONTAINER_BYTES_USED));

Review comment:
       Shouldn't Bytes used be decreased by the deleted blocks size?

##########
File path: hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
##########
@@ -88,196 +86,314 @@ public void setUp() throws Exception {
     conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
     volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf);
+
+    containerData = new KeyValueContainerData(105L,
+            layout,
+            (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
+            UUID.randomUUID().toString());
+    // Init the container.
+    container = new KeyValueContainer(containerData, conf);
+    container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), UUID
+            .randomUUID().toString());
+    db = BlockUtils.getDB(containerData, conf);
   }
 
 
   @After
-  public void tearDown() {
+  public void tearDown() throws Exception {
+    db.close();
+    db.cleanup();
     volumeSet.shutdown();
     FileUtil.fullyDelete(testRoot);
   }
 
   @Test
   public void testKeyValueBlockIteratorWithMixedBlocks() throws Exception {
-
-    long containerID = 100L;
-    int deletedBlocks = 5;
+    int deletingBlocks = 5;
     int normalBlocks = 5;
-    createContainerWithBlocks(containerID, normalBlocks, deletedBlocks);
-    String containerPath = new File(containerData.getMetadataPath())
-        .getParent();
-    try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerID, new File(containerPath))) {
+    Map<String, List<Long>> blockIDs = createContainerWithBlocks(CONTAINER_ID,
+            normalBlocks,
+            deletingBlocks);
 
-      int counter = 0;
+    // Default filter used is all unprefixed blocks.
+    List<Long> unprefixedBlockIDs = blockIDs.get("");
+    try(BlockIterator<BlockData> keyValueBlockIterator =
+                db.getStore().getBlockIterator()) {
+
+      Iterator<Long> blockIDIter = unprefixedBlockIDs.iterator();
       while (keyValueBlockIterator.hasNext()) {
         BlockData blockData = keyValueBlockIterator.nextBlock();
-        assertEquals(blockData.getLocalID(), counter++);
+        assertEquals(blockData.getLocalID(), (long)blockIDIter.next());
       }
-
       assertFalse(keyValueBlockIterator.hasNext());
+      assertFalse(blockIDIter.hasNext());
 
       keyValueBlockIterator.seekToFirst();
-      counter = 0;
+      blockIDIter = unprefixedBlockIDs.iterator();
       while (keyValueBlockIterator.hasNext()) {
         BlockData blockData = keyValueBlockIterator.nextBlock();
-        assertEquals(blockData.getLocalID(), counter++);
+        assertEquals(blockData.getLocalID(), (long)blockIDIter.next());
       }
       assertFalse(keyValueBlockIterator.hasNext());
+      assertFalse(blockIDIter.hasNext());
 
       try {
         keyValueBlockIterator.nextBlock();
       } catch (NoSuchElementException ex) {
         GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
-            "for ContainerID " + containerID, ex);
+            "for ContainerID " + CONTAINER_ID, ex);
       }
     }
   }
 
   @Test
   public void testKeyValueBlockIteratorWithNextBlock() throws Exception {
-    long containerID = 101L;
-    createContainerWithBlocks(containerID, 2, 0);
-    String containerPath = new File(containerData.getMetadataPath())
-        .getParent();
-    try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerID, new File(containerPath))) {
-      long blockID = 0L;
-      assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
-      assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
+    List<Long> blockIDs = createContainerWithBlocks(CONTAINER_ID, 2);
+    try(BlockIterator<BlockData> keyValueBlockIterator =
+                db.getStore().getBlockIterator()) {
+      assertEquals((long)blockIDs.get(0),
+              keyValueBlockIterator.nextBlock().getLocalID());
+      assertEquals((long)blockIDs.get(1),
+              keyValueBlockIterator.nextBlock().getLocalID());
 
       try {
         keyValueBlockIterator.nextBlock();
       } catch (NoSuchElementException ex) {
         GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
-            "for ContainerID " + containerID, ex);
+            "for ContainerID " + CONTAINER_ID, ex);
       }
     }
   }
 
   @Test
   public void testKeyValueBlockIteratorWithHasNext() throws Exception {
-    long containerID = 102L;
-    createContainerWithBlocks(containerID, 2, 0);
-    String containerPath = new File(containerData.getMetadataPath())
-        .getParent();
-    try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerID, new File(containerPath))) {
-      long blockID = 0L;
+    List<Long> blockIDs = createContainerWithBlocks(CONTAINER_ID, 2);
+    try(BlockIterator<BlockData> blockIter =
+                db.getStore().getBlockIterator()) {
 
       // Even calling multiple times hasNext() should not move entry forward.
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
-
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
-
-      keyValueBlockIterator.seekToLast();
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
-
-      keyValueBlockIterator.seekToFirst();
-      blockID = 0L;
-      assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
-      assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertEquals((long)blockIDs.get(0),
+              blockIter.nextBlock().getLocalID());
+
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertEquals((long)blockIDs.get(1), blockIter.nextBlock().getLocalID());
+
+      blockIter.seekToFirst();
+      assertEquals((long)blockIDs.get(0), blockIter.nextBlock().getLocalID());
+      assertEquals((long)blockIDs.get(1), blockIter.nextBlock().getLocalID());
 
       try {
-        keyValueBlockIterator.nextBlock();
+        blockIter.nextBlock();
       } catch (NoSuchElementException ex) {
         GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
-            "for ContainerID " + containerID, ex);
+            "for ContainerID " + CONTAINER_ID, ex);
       }
     }
   }
 
   @Test
   public void testKeyValueBlockIteratorWithFilter() throws Exception {
-    long containerId = 103L;
-    int deletedBlocks = 10;
     int normalBlocks = 5;
-    createContainerWithBlocks(containerId, normalBlocks, deletedBlocks);
-    String containerPath = new File(containerData.getMetadataPath())
-        .getParent();
-    try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerId, new File(containerPath), MetadataKeyFilters
-        .getDeletingKeyFilter())) {
-
-      int counter = 5;
+    int deletingBlocks = 5;
+    Map<String, List<Long>> blockIDs = createContainerWithBlocks(CONTAINER_ID,
+            normalBlocks, deletingBlocks);
+    try(BlockIterator<BlockData> keyValueBlockIterator =
+                db.getStore().getBlockIterator(
+                        MetadataKeyFilters.getDeletingKeyFilter())) {
+      List<Long> deletingBlockIDs =
+              blockIDs.get(OzoneConsts.DELETING_KEY_PREFIX);
+      int counter = 0;
       while (keyValueBlockIterator.hasNext()) {
         BlockData blockData = keyValueBlockIterator.nextBlock();
-        assertEquals(blockData.getLocalID(), counter++);
+        assertEquals(blockData.getLocalID(),
+                (long)deletingBlockIDs.get(counter));
+        counter++;
       }
-      assertEquals(10, counter);
+
+      assertEquals(deletingBlocks, counter);
     }
   }
 
   @Test
   public void testKeyValueBlockIteratorWithOnlyDeletedBlocks() throws
       Exception {
-    long containerId = 104L;
-    createContainerWithBlocks(containerId, 0, 5);
+    createContainerWithBlocks(CONTAINER_ID, 0, 5);
     String containerPath = new File(containerData.getMetadataPath())
         .getParent();
-    try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerId, new File(containerPath))) {
+    try(BlockIterator<BlockData> keyValueBlockIterator =
+                db.getStore().getBlockIterator()) {
       //As all blocks are deleted blocks, blocks does not match with normal key
       // filter.
       assertFalse(keyValueBlockIterator.hasNext());
     }
   }
 
+  /**
+   * Due to RocksDB internals, prefixed keys may be grouped all at the
+   * beginning or end of the key iteration, depending on the serialization
+   * used. Keys of the same prefix are grouped
+   * together. This method runs the same set of tests on the iterator first
+   * positively filtering one prefix, and then positively filtering
+   * a second prefix. If the sets of keys with prefix one, prefix
+   * two, and no prefixes are not empty, it follows that the filter will
+   * encounter both of the following cases:
+   *
+   * 1. A failing key followed by a passing key.
+   * 2. A passing key followed by a failing key.
+   *
+   * Note that with the current block data table implementation, there is
+   * only ever one type of prefix. This test adds a dummy second prefix type
+   * to ensure that the iterator will continue to work if more prefixes are
+   * added in the future.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testKeyValueBlockIteratorWithAdvancedFilter() throws
+          Exception {
+    // Block data table currently only uses one prefix type.
+    // Introduce a second prefix type to make sure the iterator functions
+    // correctly if more prefixes were to be added in the future.
+    final String secondPrefix = "#FOOBAR#";
+    Map<String, Integer> prefixCounts = new HashMap<>();
+    prefixCounts.put(OzoneConsts.DELETING_KEY_PREFIX, 3);
+    prefixCounts.put("", 3);
+    prefixCounts.put(secondPrefix, 3);
+
+    Map<String, List<Long>> blockIDs = createContainerWithBlocks(CONTAINER_ID,
+            prefixCounts);
+    // Test deleting filter.
+    testWithFilter(MetadataKeyFilters.getDeletingKeyFilter(),
+            blockIDs.get(OzoneConsts.DELETING_KEY_PREFIX));
+
+    // Test arbitrary filter.
+    MetadataKeyFilters.KeyPrefixFilter secondFilter =
+            new MetadataKeyFilters.KeyPrefixFilter()
+            .addFilter(secondPrefix);
+    testWithFilter(secondFilter, blockIDs.get(secondPrefix));
+  }
+
+  /**
+   * Helper method to run some iterator tests with a provided filter.
+   */
+  private void testWithFilter(MetadataKeyFilters.KeyPrefixFilter filter,
+                              List<Long> expectedIDs) throws Exception {
+    try(BlockIterator<BlockData> iterator =
+                db.getStore().getBlockIterator(filter)) {
+      // Test seek.
+      iterator.seekToFirst();
+      long firstID = iterator.nextBlock().getLocalID();
+      assertEquals(expectedIDs.get(0).longValue(), firstID);
+      assertTrue(iterator.hasNext());
+
+      // Test atypical iteration use.
+      iterator.seekToFirst();
+      int numIDsSeen = 0;
+      for (long id: expectedIDs) {
+        assertEquals(iterator.nextBlock().getLocalID(), id);
+        numIDsSeen++;
+
+        // Test that iterator can handle sporadic hasNext() calls.
+        if (id % 2 == 0 && numIDsSeen < expectedIDs.size()) {
+          assertTrue(iterator.hasNext());
+        }
+      }
+
+      assertFalse(iterator.hasNext());
+    }
+  }
+
+  /**
+   * Creates a container with specified number of unprefixed (normal) blocks.
+   * @param containerId
+   * @param normalBlocks
+   * @return The list of block IDs of normal blocks that were created.
+   * @throws Exception
+   */
+  private List<Long> createContainerWithBlocks(long containerId,
+            int normalBlocks) throws Exception {
+    return createContainerWithBlocks(containerId, normalBlocks, 0).get("");
+  }
+
   /**
    * Creates a container with specified number of normal blocks and deleted
-   * blocks. First it will insert normal blocks, and then it will insert
-   * deleted blocks.
+   * blocks.
+   * deleting blocks, then it will insert deleted blocks.

Review comment:
       Javadoc is not clear.

##########
File path: hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
##########
@@ -88,196 +86,314 @@ public void setUp() throws Exception {
     conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
     volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf);
+
+    containerData = new KeyValueContainerData(105L,
+            layout,
+            (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
+            UUID.randomUUID().toString());
+    // Init the container.
+    container = new KeyValueContainer(containerData, conf);
+    container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), UUID
+            .randomUUID().toString());
+    db = BlockUtils.getDB(containerData, conf);
   }
 
 
   @After
-  public void tearDown() {
+  public void tearDown() throws Exception {
+    db.close();
+    db.cleanup();
     volumeSet.shutdown();
     FileUtil.fullyDelete(testRoot);
   }
 
   @Test
   public void testKeyValueBlockIteratorWithMixedBlocks() throws Exception {
-
-    long containerID = 100L;
-    int deletedBlocks = 5;
+    int deletingBlocks = 5;
     int normalBlocks = 5;
-    createContainerWithBlocks(containerID, normalBlocks, deletedBlocks);
-    String containerPath = new File(containerData.getMetadataPath())
-        .getParent();
-    try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerID, new File(containerPath))) {
+    Map<String, List<Long>> blockIDs = createContainerWithBlocks(CONTAINER_ID,
+            normalBlocks,
+            deletingBlocks);
 
-      int counter = 0;
+    // Default filter used is all unprefixed blocks.
+    List<Long> unprefixedBlockIDs = blockIDs.get("");
+    try(BlockIterator<BlockData> keyValueBlockIterator =
+                db.getStore().getBlockIterator()) {
+
+      Iterator<Long> blockIDIter = unprefixedBlockIDs.iterator();
       while (keyValueBlockIterator.hasNext()) {
         BlockData blockData = keyValueBlockIterator.nextBlock();
-        assertEquals(blockData.getLocalID(), counter++);
+        assertEquals(blockData.getLocalID(), (long)blockIDIter.next());
       }
-
       assertFalse(keyValueBlockIterator.hasNext());
+      assertFalse(blockIDIter.hasNext());
 
       keyValueBlockIterator.seekToFirst();
-      counter = 0;
+      blockIDIter = unprefixedBlockIDs.iterator();
       while (keyValueBlockIterator.hasNext()) {
         BlockData blockData = keyValueBlockIterator.nextBlock();
-        assertEquals(blockData.getLocalID(), counter++);
+        assertEquals(blockData.getLocalID(), (long)blockIDIter.next());
       }
       assertFalse(keyValueBlockIterator.hasNext());
+      assertFalse(blockIDIter.hasNext());
 
       try {
         keyValueBlockIterator.nextBlock();
       } catch (NoSuchElementException ex) {
         GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
-            "for ContainerID " + containerID, ex);
+            "for ContainerID " + CONTAINER_ID, ex);
       }
     }
   }
 
   @Test
   public void testKeyValueBlockIteratorWithNextBlock() throws Exception {
-    long containerID = 101L;
-    createContainerWithBlocks(containerID, 2, 0);
-    String containerPath = new File(containerData.getMetadataPath())
-        .getParent();
-    try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerID, new File(containerPath))) {
-      long blockID = 0L;
-      assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
-      assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
+    List<Long> blockIDs = createContainerWithBlocks(CONTAINER_ID, 2);
+    try(BlockIterator<BlockData> keyValueBlockIterator =
+                db.getStore().getBlockIterator()) {
+      assertEquals((long)blockIDs.get(0),
+              keyValueBlockIterator.nextBlock().getLocalID());
+      assertEquals((long)blockIDs.get(1),
+              keyValueBlockIterator.nextBlock().getLocalID());
 
       try {
         keyValueBlockIterator.nextBlock();
       } catch (NoSuchElementException ex) {
         GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
-            "for ContainerID " + containerID, ex);
+            "for ContainerID " + CONTAINER_ID, ex);
       }
     }
   }
 
   @Test
   public void testKeyValueBlockIteratorWithHasNext() throws Exception {
-    long containerID = 102L;
-    createContainerWithBlocks(containerID, 2, 0);
-    String containerPath = new File(containerData.getMetadataPath())
-        .getParent();
-    try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerID, new File(containerPath))) {
-      long blockID = 0L;
+    List<Long> blockIDs = createContainerWithBlocks(CONTAINER_ID, 2);
+    try(BlockIterator<BlockData> blockIter =
+                db.getStore().getBlockIterator()) {
 
       // Even calling multiple times hasNext() should not move entry forward.
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
-
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
-
-      keyValueBlockIterator.seekToLast();
-      assertTrue(keyValueBlockIterator.hasNext());
-      assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
-
-      keyValueBlockIterator.seekToFirst();
-      blockID = 0L;
-      assertEquals(blockID++, keyValueBlockIterator.nextBlock().getLocalID());
-      assertEquals(blockID, keyValueBlockIterator.nextBlock().getLocalID());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertEquals((long)blockIDs.get(0),
+              blockIter.nextBlock().getLocalID());
+
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertTrue(blockIter.hasNext());
+      assertEquals((long)blockIDs.get(1), blockIter.nextBlock().getLocalID());
+
+      blockIter.seekToFirst();
+      assertEquals((long)blockIDs.get(0), blockIter.nextBlock().getLocalID());
+      assertEquals((long)blockIDs.get(1), blockIter.nextBlock().getLocalID());
 
       try {
-        keyValueBlockIterator.nextBlock();
+        blockIter.nextBlock();
       } catch (NoSuchElementException ex) {
         GenericTestUtils.assertExceptionContains("Block Iterator reached end " +
-            "for ContainerID " + containerID, ex);
+            "for ContainerID " + CONTAINER_ID, ex);
       }
     }
   }
 
   @Test
   public void testKeyValueBlockIteratorWithFilter() throws Exception {
-    long containerId = 103L;
-    int deletedBlocks = 10;
     int normalBlocks = 5;
-    createContainerWithBlocks(containerId, normalBlocks, deletedBlocks);
-    String containerPath = new File(containerData.getMetadataPath())
-        .getParent();
-    try(KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerId, new File(containerPath), MetadataKeyFilters
-        .getDeletingKeyFilter())) {
-
-      int counter = 5;
+    int deletingBlocks = 5;
+    Map<String, List<Long>> blockIDs = createContainerWithBlocks(CONTAINER_ID,
+            normalBlocks, deletingBlocks);
+    try(BlockIterator<BlockData> keyValueBlockIterator =
+                db.getStore().getBlockIterator(
+                        MetadataKeyFilters.getDeletingKeyFilter())) {
+      List<Long> deletingBlockIDs =
+              blockIDs.get(OzoneConsts.DELETING_KEY_PREFIX);
+      int counter = 0;
       while (keyValueBlockIterator.hasNext()) {
         BlockData blockData = keyValueBlockIterator.nextBlock();
-        assertEquals(blockData.getLocalID(), counter++);
+        assertEquals(blockData.getLocalID(),
+                (long)deletingBlockIDs.get(counter));

Review comment:
       Nitpick: Expected value comes before Actual value in assert. It might be confusing in case there is an exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-issues-help@hadoop.apache.org