You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2022/10/27 16:48:53 UTC

[ozone] branch ozone-1.3 updated: HDDS-7121. Support namespace summaries (du, dist & counts) for legacy FS buckets (#3746)

This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch ozone-1.3
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/ozone-1.3 by this push:
     new 61ed95516b HDDS-7121. Support namespace summaries (du, dist & counts) for legacy FS buckets (#3746)
61ed95516b is described below

commit 61ed95516baf578c004dd1be5082797bc024c5ca
Author: Christos Bisias <ch...@gmail.com>
AuthorDate: Tue Oct 25 21:47:56 2022 +0300

    HDDS-7121. Support namespace summaries (du, dist & counts) for legacy FS buckets (#3746)
    
    Change-Id: I478fc74ec8b0899441d1c003c2f200d7dd9dcc67
---
 .../hadoop/ozone/om/helpers/BucketLayout.java      |  13 +
 .../dist/src/main/compose/ozone-legacy-bucket/.env |  20 +
 .../src/main/compose/ozone-legacy-bucket/README.md |  21 +
 .../ozone-legacy-bucket/docker-compose.yaml        |  78 ++++
 .../main/compose/ozone-legacy-bucket/docker-config |  52 +++
 .../src/main/compose/ozone-legacy-bucket/test.sh   |  35 ++
 ...n-fso-nssummary.robot => recon-nssummary.robot} |  12 +-
 .../hadoop/ozone/shell/TestNSSummaryAdmin.java     |   6 +-
 .../hadoop/ozone/recon/ReconControllerModule.java  |   4 +-
 .../ozone/recon/api/handlers/BucketHandler.java    |  30 +-
 .../recon/api/handlers/DirectoryEntityHandler.java |  20 +-
 .../recon/api/handlers/LegacyBucketHandler.java    | 325 ++++++++++++++
 .../hadoop/ozone/recon/api/types/NSSummary.java    |   4 +-
 .../hadoop/ozone/recon/tasks/NSSummaryTask.java    | 232 ++++------
 ...yTask.java => NSSummaryTaskDbEventHandler.java} |  60 +--
 .../ozone/recon/tasks/NSSummaryTaskWithFSO.java    |  42 +-
 .../ozone/recon/tasks/NSSummaryTaskWithLegacy.java | 307 +++++++++++++
 .../ozone/recon/OMMetadataManagerTestUtils.java    |  24 +-
 .../recon/api/TestNSSummaryEndpointWithFSO.java    |  10 +-
 ...O.java => TestNSSummaryEndpointWithLegacy.java} | 297 +++++++------
 .../ozone/recon/tasks/TestNSSummaryTask.java       | 492 +++++++++++++++++++++
 .../recon/tasks/TestNSSummaryTaskWithFSO.java      |  13 +-
 ...thFSO.java => TestNSSummaryTaskWithLegacy.java} | 328 ++++++++++----
 .../ozone/admin/nssummary/DiskUsageSubCommand.java |   7 +-
 .../admin/nssummary/FileSizeDistSubCommand.java    |   7 +-
 .../ozone/admin/nssummary/NSSummaryAdmin.java      |  57 +++
 .../ozone/admin/nssummary/NSSummaryCLIUtils.java   |   9 +-
 .../admin/nssummary/QuotaUsageSubCommand.java      |   7 +-
 .../ozone/admin/nssummary/SummarySubCommand.java   |   7 +-
 29 files changed, 2040 insertions(+), 479 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/BucketLayout.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/BucketLayout.java
index 8a17777b11..68147fab44 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/BucketLayout.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/BucketLayout.java
@@ -70,6 +70,19 @@ public enum BucketLayout {
     return this.equals(LEGACY);
   }
 
+  public boolean isObjectStore(boolean enableFileSystemPaths) {
+    if (this.equals(OBJECT_STORE)) {
+      return true;
+    } else {
+      // If bucket layout is Legacy and FileSystemPaths
+      // are disabled, then the bucket operates as OBS.
+      if (this.equals(LEGACY) && !enableFileSystemPaths) {
+        return true;
+      }
+      return false;
+    }
+  }
+
   public boolean shouldNormalizePaths(boolean enableFileSystemPaths) {
     switch (this) {
     case OBJECT_STORE:
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/.env b/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/.env
new file mode 100644
index 0000000000..2de359fc5d
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/.env
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HDDS_VERSION=${hdds.version}
+OZONE_RUNNER_VERSION=${docker.ozone-runner.version}
+OZONE_RUNNER_IMAGE=apache/ozone-runner
+OZONE_OPTS=
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/README.md b/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/README.md
new file mode 100644
index 0000000000..d31d8f20fb
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/README.md
@@ -0,0 +1,21 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# For Legacy Bucket Operations
+
+For Legacy buckets, set `ozone.om.enable.filesystem.paths` to `true` for them to behave like FSO buckets, 
+otherwise Legacy buckets act like OBS buckets.
+
+This is the same as `compose/ozone` but for testing operations that need `ozone.om.enable.filesystem.paths`
+flag enabled.
\ No newline at end of file
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/docker-compose.yaml
new file mode 100644
index 0000000000..72303abaf6
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/docker-compose.yaml
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3.4"
+
+# reusable fragments (see https://docs.docker.com/compose/compose-file/#extension-fields)
+x-common-config:
+  &common-config
+  image: ${OZONE_RUNNER_IMAGE}:${OZONE_RUNNER_VERSION}
+  volumes:
+    - ../..:/opt/hadoop
+  env_file:
+    - docker-config
+
+x-replication:
+  &replication
+  OZONE-SITE.XML_ozone.replication: ${OZONE_REPLICATION_FACTOR:-1}
+
+services:
+  datanode:
+    <<: *common-config
+    ports:
+      - 9864
+      - 9882
+    environment:
+      <<: *replication
+      OZONE_OPTS:
+    command: ["ozone","datanode"]
+  om:
+    <<: *common-config
+    environment:
+      ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION
+      OZONE_OPTS:
+      <<: *replication
+    ports:
+      - 9874:9874
+      - 9862:9862
+    command: ["ozone","om"]
+  scm:
+    <<: *common-config
+    ports:
+      - 9876:9876
+      - 9860:9860
+    environment:
+      ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION
+      OZONE-SITE.XML_hdds.scm.safemode.min.datanode: ${OZONE_SAFEMODE_MIN_DATANODES:-1}
+      OZONE_OPTS:
+      <<: *replication
+    command: ["ozone","scm"]
+  s3g:
+    <<: *common-config
+    environment:
+      OZONE_OPTS:
+      <<: *replication
+    ports:
+      - 9878:9878
+    command: ["ozone","s3g"]
+  recon:
+    <<: *common-config
+    ports:
+      - 9888:9888
+    environment:
+      OZONE_OPTS:
+      <<: *replication
+    command: ["ozone","recon"]
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/docker-config
new file mode 100644
index 0000000000..90d62dcd00
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/docker-config
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CORE-SITE.XML_fs.defaultFS=ofs://om
+CORE-SITE.XML_fs.trash.interval=1
+
+OZONE-SITE.XML_ozone.om.address=om
+OZONE-SITE.XML_ozone.om.enable.filesystem.paths=true
+OZONE-SITE.XML_ozone.default.bucket.layout=LEGACY
+OZONE-SITE.XML_ozone.om.http-address=om:9874
+OZONE-SITE.XML_ozone.scm.http-address=scm:9876
+OZONE-SITE.XML_ozone.scm.container.size=1GB
+OZONE-SITE.XML_ozone.scm.block.size=1MB
+OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
+OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
+OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
+OZONE-SITE.XML_ozone.scm.names=scm
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.block.client.address=scm
+OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
+OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
+OZONE-SITE.XML_ozone.scm.client.address=scm
+OZONE-SITE.XML_hdds.datanode.dir=/data/hdds
+OZONE-SITE.XML_ozone.recon.address=recon:9891
+OZONE-SITE.XML_ozone.recon.http-address=0.0.0.0:9888
+OZONE-SITE.XML_ozone.recon.https-address=0.0.0.0:9889
+OZONE-SITE.XML_ozone.recon.om.snapshot.task.interval.delay=1m
+OZONE-SITE.XML_ozone.datanode.pipeline.limit=1
+OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
+OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
+OZONE-SITE.XML_ozone.scm.stale.node.interval=30s
+OZONE-SITE.XML_ozone.scm.dead.node.interval=45s
+OZONE-SITE.XML_hdds.heartbeat.interval=5s
+
+OZONE_CONF_DIR=/etc/hadoop
+OZONE_LOG_DIR=/var/log/hadoop
+
+no_proxy=om,scm,s3g,recon,kdc,localhost,127.0.0.1
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/test.sh b/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/test.sh
new file mode 100644
index 0000000000..4f776686a5
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/compose/ozone-legacy-bucket/test.sh
@@ -0,0 +1,35 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#suite:unsecure
+
+COMPOSE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
+export COMPOSE_DIR
+
+export SECURITY_ENABLED=false
+export OZONE_REPLICATION_FACTOR=3
+
+# shellcheck source=/dev/null
+source "$COMPOSE_DIR/../testlib.sh"
+
+start_docker_env 5
+
+execute_robot_test scm -v BUCKET_LAYOUT:LEGACY recon/recon-nssummary.robot
+
+stop_docker_env
+
+generate_report
diff --git a/hadoop-ozone/dist/src/main/smoketest/recon/recon-fso-nssummary.robot b/hadoop-ozone/dist/src/main/smoketest/recon/recon-nssummary.robot
similarity index 92%
rename from hadoop-ozone/dist/src/main/smoketest/recon/recon-fso-nssummary.robot
rename to hadoop-ozone/dist/src/main/smoketest/recon/recon-nssummary.robot
index 5994e487f9..86ca25e219 100644
--- a/hadoop-ozone/dist/src/main/smoketest/recon/recon-fso-nssummary.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/recon/recon-nssummary.robot
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 *** Settings ***
-Documentation       Smoke test for Recon Namespace Summary Endpoint for FSO buckets.
+Documentation       Smoke test for Recon Namespace Summary Endpoint for ${BUCKET_LAYOUT} buckets.
 Library             OperatingSystem
 Library             String
 Library             BuiltIn
@@ -29,6 +29,7 @@ ${SUMMARY_URL}              ${ADMIN_NAMESPACE_URL}/summary
 ${DISK_USAGE_URL}           ${ADMIN_NAMESPACE_URL}/du
 ${QUOTA_USAGE_URL}          ${ADMIN_NAMESPACE_URL}/quota
 ${FILE_SIZE_DIST_URL}       ${ADMIN_NAMESPACE_URL}/dist
+${BUCKET_LAYOUT}            FILE_SYSTEM_OPTIMIZED
 ${VOLUME}
 ${BUCKET}
 
@@ -42,7 +43,7 @@ Create volume
 Create bucket
     ${random} =     Generate Random String  5  [LOWER]
                     Set Suite Variable     ${BUCKET}    buc-${random}
-    ${result} =     Execute             ozone sh bucket create -l FILE_SYSTEM_OPTIMIZED /${VOLUME}/${BUCKET}
+    ${result} =     Execute             ozone sh bucket create -l ${BUCKET_LAYOUT} /${VOLUME}/${BUCKET}
                     Should not contain  ${result}       Failed
 
 Create keys
@@ -83,7 +84,7 @@ Check Access
     kinit as recon admin
     Check http return code      ${url}       200
 
-Test Summary                            
+Test Summary
     [Arguments]         ${url}        ${expected}
            ${result} =         Execute                              curl --negotiate -u : -LSs ${url}
                                Should contain      ${result}       \"status\":\"OK\"
@@ -131,7 +132,8 @@ Check Recon Namespace Summary Key
     Wait For Summary      ${SUMMARY_URL}?path=/${VOLUME}/${BUCKET}/file1   KEY
 
 Check Recon Namespace Summary Directory
-    Wait For Summary      ${SUMMARY_URL}?path=/${VOLUME}/${BUCKET}/dir1/dir2   DIRECTORY
+    Run Keyword If    '${BUCKET_LAYOUT}' == 'LEGACY'                    Wait For Summary      ${SUMMARY_URL}?path=/${VOLUME}/${BUCKET}/dir1/dir2/   DIRECTORY
+    Run Keyword If    '${BUCKET_LAYOUT}' == 'FILE_SYSTEM_OPTIMIZED'     Wait For Summary      ${SUMMARY_URL}?path=/${VOLUME}/${BUCKET}/dir1/dir2    DIRECTORY
 
 Check Recon Namespace Disk Usage
     Wait For Summary      ${DISK_USAGE_URL}?path=/${VOLUME}/${BUCKET}&files=true&replica=true     \"sizeWithReplica\"
@@ -143,4 +145,4 @@ Check Recon Namespace Bucket Quota Usage
     Wait For Summary      ${QUOTA_USAGE_URL}?path=/${VOLUME}/${BUCKET}   \"used\"
 
 Check Recon Namespace File Size Distribution Root
-    Wait For Summary      ${FILE_SIZE_DIST_URL}?path=/                   \"dist\"
+    Wait For Summary      ${FILE_SIZE_DIST_URL}?path=/                   \"dist\"
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestNSSummaryAdmin.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestNSSummaryAdmin.java
index 69c4a762e6..c0b5e82a0b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestNSSummaryAdmin.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestNSSummaryAdmin.java
@@ -110,7 +110,7 @@ public class TestNSSummaryAdmin extends StandardOutputTestBase {
     // Should throw warning - only buckets can have bucket layout.
     Assert.assertTrue(
         getOutContentString().contains(
-            "[Warning] Namespace CLI is only designed for FSO mode."));
+            "[Warning] Namespace CLI is not designed for OBS bucket layout."));
     Assert.assertTrue(getOutContentString()
         .contains("Put more files into it to visualize DU"));
     Assert.assertTrue(getOutContentString().contains(
@@ -128,7 +128,7 @@ public class TestNSSummaryAdmin extends StandardOutputTestBase {
     // Should not throw warning, since bucket is in FSO bucket layout.
     Assert.assertFalse(
         getOutContentString().contains(
-            "[Warning] Namespace CLI is only designed for FSO mode."));
+            "[Warning] Namespace CLI is not designed for OBS bucket layout."));
     Assert.assertTrue(getOutContentString()
         .contains("Put more files into it to visualize DU"));
     Assert.assertTrue(getOutContentString().contains(
@@ -146,7 +146,7 @@ public class TestNSSummaryAdmin extends StandardOutputTestBase {
     // Should throw warning, since bucket is in OBS bucket layout.
     Assert.assertTrue(
         getOutContentString().contains(
-            "[Warning] Namespace CLI is only designed for FSO mode."));
+            "[Warning] Namespace CLI is not designed for OBS bucket layout."));
     Assert.assertTrue(getOutContentString()
         .contains("Put more files into it to visualize DU"));
     Assert.assertTrue(getOutContentString().contains(
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index 6892524e82..61fefabee5 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl;
 import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
-import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithFSO;
+import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask;
 import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
 import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
 import org.apache.hadoop.ozone.recon.tasks.TableCountTask;
@@ -126,7 +126,7 @@ public class ReconControllerModule extends AbstractModule {
       taskBinder.addBinding().to(ContainerKeyMapperTask.class);
       taskBinder.addBinding().to(FileSizeCountTask.class);
       taskBinder.addBinding().to(TableCountTask.class);
-      taskBinder.addBinding().to(NSSummaryTaskWithFSO.class);
+      taskBinder.addBinding().to(NSSummaryTask.class);
     }
   }
 
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/BucketHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/BucketHandler.java
index 2815fb6d13..d12c3cdb6a 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/BucketHandler.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/BucketHandler.java
@@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
+
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.om.helpers.OzoneFSUtils.removeTrailingSlashIfNeeded;
 
@@ -183,8 +185,24 @@ public abstract class BucketHandler {
                 OzoneStorageContainerManager reconSCM,
                 OmBucketInfo bucketInfo) throws IOException {
 
-    return new FSOBucketHandler(reconNamespaceSummaryManager,
-              omMetadataManager, reconSCM, bucketInfo);
+    // If bucketInfo is null then entity type is UNKNOWN
+    if (Objects.isNull(bucketInfo)) {
+      return null;
+    } else {
+      if (bucketInfo.getBucketLayout()
+          .equals(BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
+        return new FSOBucketHandler(reconNamespaceSummaryManager,
+            omMetadataManager, reconSCM, bucketInfo);
+      } else if (bucketInfo.getBucketLayout()
+          .equals(BucketLayout.LEGACY)) {
+        return new LegacyBucketHandler(reconNamespaceSummaryManager,
+            omMetadataManager, reconSCM, bucketInfo);
+      } else {
+        LOG.error("Unsupported bucket layout: " +
+            bucketInfo.getBucketLayout());
+        return null;
+      }
+    }
   }
 
   public static BucketHandler getBucketHandler(
@@ -197,11 +215,7 @@ public abstract class BucketHandler {
     OmBucketInfo bucketInfo = omMetadataManager
         .getBucketTable().getSkipCache(bucketKey);
 
-    if (bucketInfo == null) {
-      return null;
-    } else {
-      return getBucketHandler(reconNamespaceSummaryManager,
-          omMetadataManager, reconSCM, bucketInfo);
-    }
+    return getBucketHandler(reconNamespaceSummaryManager,
+        omMetadataManager, reconSCM, bucketInfo);
   }
 }
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/DirectoryEntityHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/DirectoryEntityHandler.java
index f1058ddf3d..0cfa6f1b47 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/DirectoryEntityHandler.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/DirectoryEntityHandler.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
 import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
 
 import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -89,7 +91,23 @@ public class DirectoryEntityHandler extends EntityHandler {
     for (long subdirObjectId: subdirs) {
       NSSummary subdirNSSummary =
               getReconNamespaceSummaryManager().getNSSummary(subdirObjectId);
-      String subdirName = subdirNSSummary.getDirName();
+      // for the subdirName we need the subdir filename, not the key name
+      // Eg. /vol/bucket1/dir1/dir2,
+      // key name is /dir1/dir2
+      // we need to get dir2
+      Path subdirPath = Paths.get(subdirNSSummary.getDirName());
+      Path subdirFileName = subdirPath.getFileName();
+      String subdirName;
+      // Checking for null to get rid of a findbugs error and
+      // then throwing the NPException to avoid swallowing it.
+      // Error: Possible null pointer dereference in
+      // ...DirectoryEntityHandler.getDuResponse(boolean, boolean) due to
+      // return value of called method Dereferenced at DirectoryEntityHandler
+      if (subdirFileName != null) {
+        subdirName = subdirFileName.toString();
+      } else {
+        throw new NullPointerException("Subdirectory file name is null.");
+      }
       // build the path for subdirectory
       String subpath = BucketHandler
               .buildSubpath(getNormalizedPath(), subdirName);
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/LegacyBucketHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/LegacyBucketHandler.java
new file mode 100644
index 0000000000..e4d218fed9
--- /dev/null
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/handlers/LegacyBucketHandler.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.recon.api.handlers;
+
+import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.recon.api.types.DUResponse;
+import org.apache.hadoop.ozone.recon.api.types.EntityType;
+import org.apache.hadoop.ozone.recon.api.types.NSSummary;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Class for handling Legacy buckets.
+ */
+public class LegacyBucketHandler extends BucketHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      LegacyBucketHandler.class);
+
+  private final String vol;
+  private final String bucket;
+  private final OmBucketInfo omBucketInfo;
+
+  public LegacyBucketHandler(
+      ReconNamespaceSummaryManager reconNamespaceSummaryManager,
+      ReconOMMetadataManager omMetadataManager,
+      OzoneStorageContainerManager reconSCM,
+      OmBucketInfo bucketInfo) {
+    super(reconNamespaceSummaryManager, omMetadataManager,
+        reconSCM);
+    this.omBucketInfo = bucketInfo;
+    this.vol = omBucketInfo.getVolumeName();
+    this.bucket = omBucketInfo.getBucketName();
+  }
+
+  /**
+   * Helper function to check if a path is a directory, key, or invalid.
+   * @param keyName key name
+   * @return DIRECTORY, KEY, or UNKNOWN
+   * @throws IOException
+   */
+  @Override
+  public EntityType determineKeyPath(String keyName)
+      throws IOException {
+
+    String filename = OzoneFSUtils.removeTrailingSlashIfNeeded(keyName);
+    // For example, /vol1/buck1/a/b/c/d/e/file1.txt
+    // Look in the KeyTable for the key path,
+    // if the first one we seek to is the same as the seek key,
+    // it is a key;
+    // if it is the seekKey with a trailing slash, it is a directory
+    // else it is unknown
+    String key = OM_KEY_PREFIX + vol +
+        OM_KEY_PREFIX + bucket +
+        OM_KEY_PREFIX + filename;
+
+    Table<String, OmKeyInfo> keyTable = getKeyTable();
+
+    TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+        iterator = keyTable.iterator();
+
+    iterator.seek(key);
+    if (iterator.hasNext()) {
+      Table.KeyValue<String, OmKeyInfo> kv = iterator.next();
+      String dbKey = kv.getKey();
+      if (dbKey.equals(key)) {
+        return EntityType.KEY;
+      }
+      if (dbKey.equals(key + OM_KEY_PREFIX)) {
+        return EntityType.DIRECTORY;
+      }
+    }
+    return EntityType.UNKNOWN;
+  }
+
+  /**
+   * KeyTable's key is in the format of "vol/bucket/keyName".
+   * Make use of RocksDB's order to seek to the prefix and avoid full iteration.
+   * Calculating DU only for keys. Skipping any directories and
+   * handling only direct keys.
+   * @param parentId
+   * @return total DU of direct keys under object
+   * @throws IOException
+   */
+  @Override
+  public long calculateDUUnderObject(long parentId)
+      throws IOException {
+    Table<String, OmKeyInfo> keyTable = getKeyTable();
+
+    long totalDU = 0L;
+    TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+        iterator = keyTable.iterator();
+
+    String seekPrefix = OM_KEY_PREFIX +
+        vol +
+        OM_KEY_PREFIX +
+        bucket +
+        OM_KEY_PREFIX;
+
+    NSSummary nsSummary = getReconNamespaceSummaryManager()
+        .getNSSummary(parentId);
+    // empty bucket
+    if (nsSummary == null) {
+      return 0;
+    }
+
+    if (omBucketInfo.getObjectID() != parentId) {
+      String dirName = nsSummary.getDirName();
+      seekPrefix += dirName;
+    }
+
+    String[] seekKeys = seekPrefix.split(OM_KEY_PREFIX);
+    iterator.seek(seekPrefix);
+    // handle direct keys
+    while (iterator.hasNext()) {
+      Table.KeyValue<String, OmKeyInfo> kv = iterator.next();
+      String dbKey = kv.getKey();
+      // since the RocksDB is ordered, seek until the prefix isn't matched
+      if (!dbKey.startsWith(seekPrefix)) {
+        break;
+      }
+
+      String[] keys = dbKey.split(OM_KEY_PREFIX);
+
+      // iteration moved to the next level
+      // and not handling direct keys
+      if (keys.length - seekKeys.length > 1) {
+        continue;
+      }
+
+      OmKeyInfo keyInfo = kv.getValue();
+      if (keyInfo != null) {
+        // skip directory markers, just include directKeys
+        if (keyInfo.getKeyName().endsWith(OM_KEY_PREFIX)) {
+          continue;
+        }
+        totalDU += getKeySizeWithReplication(keyInfo);
+      }
+    }
+
+    // handle nested keys (DFS)
+    Set<Long> subDirIds = nsSummary.getChildDir();
+    for (long subDirId: subDirIds) {
+      totalDU += calculateDUUnderObject(subDirId);
+    }
+    return totalDU;
+  }
+
+  /**
+   * This method handles disk usage of direct keys.
+   * @param parentId parent directory/bucket
+   * @param withReplica if withReplica is enabled, set sizeWithReplica
+   * for each direct key's DU
+   * @param listFile if listFile is enabled, append key DU as a subpath
+   * @param duData the current DU data
+   * @param normalizedPath the normalized path request
+   * @return the total DU of all direct keys
+   * @throws IOException IOE
+   */
+  @Override
+  public long handleDirectKeys(long parentId, boolean withReplica,
+                               boolean listFile,
+                               List<DUResponse.DiskUsage> duData,
+                               String normalizedPath) throws IOException {
+
+    Table<String, OmKeyInfo> keyTable = getKeyTable();
+    long keyDataSizeWithReplica = 0L;
+
+    TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+        iterator = keyTable.iterator();
+
+    String seekPrefix = OM_KEY_PREFIX +
+        vol +
+        OM_KEY_PREFIX +
+        bucket +
+        OM_KEY_PREFIX;
+
+    NSSummary nsSummary = getReconNamespaceSummaryManager()
+        .getNSSummary(parentId);
+    // empty bucket
+    if (nsSummary == null) {
+      return 0;
+    }
+
+    if (omBucketInfo.getObjectID() != parentId) {
+      String dirName = nsSummary.getDirName();
+      seekPrefix += dirName;
+    }
+    String[] seekKeys = seekPrefix.split(OM_KEY_PREFIX);
+    iterator.seek(seekPrefix);
+
+    while (iterator.hasNext()) {
+      Table.KeyValue<String, OmKeyInfo> kv = iterator.next();
+      String dbKey = kv.getKey();
+
+      if (!dbKey.startsWith(seekPrefix)) {
+        break;
+      }
+
+      String[] keys = dbKey.split(OM_KEY_PREFIX);
+
+      // iteration moved to the next level
+      // and not handling direct keys
+      if (keys.length - seekKeys.length > 1) {
+        continue;
+      }
+
+      OmKeyInfo keyInfo = kv.getValue();
+      if (keyInfo != null) {
+        // skip directory markers, just include directKeys
+        if (keyInfo.getKeyName().endsWith(OM_KEY_PREFIX)) {
+          continue;
+        }
+        DUResponse.DiskUsage diskUsage = new DUResponse.DiskUsage();
+        String subpath = buildSubpath(normalizedPath,
+            keyInfo.getFileName());
+        diskUsage.setSubpath(subpath);
+        diskUsage.setKey(true);
+        diskUsage.setSize(keyInfo.getDataSize());
+
+        if (withReplica) {
+          long keyDU = getKeySizeWithReplication(keyInfo);
+          keyDataSizeWithReplica += keyDU;
+          diskUsage.setSizeWithReplica(keyDU);
+        }
+        // list the key as a subpath
+        if (listFile) {
+          duData.add(diskUsage);
+        }
+      }
+    }
+
+    return keyDataSizeWithReplica;
+  }
+
+  /**
+   * Given a valid path request for a directory,
+   * return the directory object ID.
+   * @param names parsed path request in a list of names
+   * @return directory object ID
+   */
+  @Override
+  public long getDirObjectId(String[] names) throws IOException {
+    return getDirObjectId(names, names.length);
+  }
+
+  /**
+   * Given a valid path request and a cutoff length where should be iterated
+   * up to.
+   * return the directory object ID for the object at the cutoff length
+   * @param names parsed path request in a list of names
+   * @param cutoff cannot be larger than the names' length. If equals,
+   *               return the directory object id for the whole path
+   * @return directory object ID
+   */
+  @Override
+  public long getDirObjectId(String[] names, int cutoff) throws IOException {
+    long dirObjectId = getBucketObjectId(names);
+    StringBuilder bld = new StringBuilder();
+    for (int i = 0; i < cutoff; ++i) {
+      bld.append(OM_KEY_PREFIX)
+          .append(names[i]);
+    }
+    bld.append(OM_KEY_PREFIX);
+    String dirKey = bld.toString();
+    OmKeyInfo dirInfo = getKeyTable().getSkipCache(dirKey);
+
+    if (dirInfo != null) {
+      dirObjectId = dirInfo.getObjectID();
+    } else {
+      throw new IOException("OmKeyInfo for the directory is null");
+    }
+
+    return dirObjectId;
+  }
+
+  @Override
+  public BucketLayout getBucketLayout() {
+    return BucketLayout.LEGACY;
+  }
+
+  @Override
+  public OmKeyInfo getKeyInfo(String[] names) throws IOException {
+    String ozoneKey = OM_KEY_PREFIX;
+    ozoneKey += String.join(OM_KEY_PREFIX, names);
+
+    OmKeyInfo keyInfo = getKeyTable().getSkipCache(ozoneKey);
+    return keyInfo;
+  }
+
+  public Table<String, OmKeyInfo> getKeyTable() {
+    Table keyTable =
+        getOmMetadataManager().getKeyTable(getBucketLayout());
+    return keyTable;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/NSSummary.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/NSSummary.java
index 1b22081f52..eeb5014991 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/NSSummary.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/NSSummary.java
@@ -24,6 +24,8 @@ import java.util.Set;
 import java.util.HashSet;
 import java.util.Arrays;
 
+import static org.apache.hadoop.ozone.om.helpers.OzoneFSUtils.removeTrailingSlashIfNeeded;
+
 /**
  * Class to encapsulate namespace metadata summaries from OM.
  */
@@ -90,7 +92,7 @@ public class NSSummary {
   }
 
   public void setDirName(String dirName) {
-    this.dirName = dirName;
+    this.dirName = removeTrailingSlashIfNeeded(dirName);
   }
 
   public void addChildDir(long childId) {
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java
index 7baeefdbe4..63b6ee375c 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java
@@ -17,25 +17,30 @@
  */
 
 package org.apache.hadoop.ozone.recon.tasks;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.recon.ReconUtils;
-import org.apache.hadoop.ozone.recon.api.types.NSSummary;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
 import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import java.io.IOException;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
 
 /**
  * Task to query data from OMDB and write into Recon RocksDB.
- * Reprocess() will take a snapshots on OMDB, and iterate the keyTable and
- * dirTable to write all information to RocksDB.
+ * Reprocess() will take a snapshots on OMDB, and iterate the keyTable,
+ * the fileTable and the dirTable to write all information to RocksDB.
  *
  * For FSO-enabled keyTable (fileTable), we need to fetch the parent object
  * (bucket or directory), increment its numOfKeys by 1, increase its sizeOfKeys
@@ -44,166 +49,99 @@ import java.util.Map;
  * For dirTable, we need to fetch the parent object (bucket or directory),
  * add the current directory's objectID to the parent object's childDir field.
  *
+ * For keyTable, the parent object is not available. Get the parent object,
+ * add it to the current object and reuse the existing methods for FSO.
+ * Only processing entries that belong to Legacy buckets. If the entry
+ * refers to a directory then build directory info object from it.
+ *
  * Process() will write all OMDB updates to RocksDB.
- * The write logic is the same as above. For update action, we will treat it as
+ * Write logic is the same as above. For update action, we will treat it as
  * delete old value first, and write updated value then.
  */
-public abstract class NSSummaryTask implements ReconOmTask {
+public class NSSummaryTask implements ReconOmTask {
   private static final Logger LOG =
           LoggerFactory.getLogger(NSSummaryTask.class);
 
   private final ReconNamespaceSummaryManager reconNamespaceSummaryManager;
+  private final ReconOMMetadataManager reconOMMetadataManager;
+  private final NSSummaryTaskWithFSO nsSummaryTaskWithFSO;
+  private final NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy;
+  private final OzoneConfiguration ozoneConfiguration;
 
   @Inject
   public NSSummaryTask(ReconNamespaceSummaryManager
-                                 reconNamespaceSummaryManager) {
+                       reconNamespaceSummaryManager,
+                       ReconOMMetadataManager
+                       reconOMMetadataManager,
+                       OzoneConfiguration
+                       ozoneConfiguration) {
     this.reconNamespaceSummaryManager = reconNamespaceSummaryManager;
+    this.reconOMMetadataManager = reconOMMetadataManager;
+    this.ozoneConfiguration = ozoneConfiguration;
+    this.nsSummaryTaskWithFSO = new NSSummaryTaskWithFSO(
+        reconNamespaceSummaryManager, reconOMMetadataManager);
+    this.nsSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(
+        reconNamespaceSummaryManager,
+        reconOMMetadataManager, ozoneConfiguration);
   }
 
-  public ReconNamespaceSummaryManager getReconNamespaceSummaryManager() {
-    return reconNamespaceSummaryManager;
-  }
-
-  public abstract String getTaskName();
-
-  public abstract Pair<String, Boolean> process(OMUpdateEventBatch events);
-
-  public abstract Pair<String, Boolean> reprocess(
-      OMMetadataManager omMetadataManager);
-
-  protected void writeNSSummariesToDB(Map<Long, NSSummary> nsSummaryMap)
-      throws IOException {
-    try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
-      nsSummaryMap.keySet().forEach((Long key) -> {
-        try {
-          reconNamespaceSummaryManager.batchStoreNSSummaries(rdbBatchOperation,
-              key, nsSummaryMap.get(key));
-        } catch (IOException e) {
-          LOG.error("Unable to write Namespace Summary data in Recon DB.",
-              e);
-        }
-      });
-      reconNamespaceSummaryManager.commitBatchOperation(rdbBatchOperation);
-    }
-  }
-
-  protected void handlePutKeyEvent(OmKeyInfo keyInfo, Map<Long,
-      NSSummary> nsSummaryMap) throws IOException {
-    long parentObjectId = keyInfo.getParentObjectID();
-    // Try to get the NSSummary from our local map that maps NSSummaries to IDs
-    NSSummary nsSummary = nsSummaryMap.get(parentObjectId);
-    if (nsSummary == null) {
-      // If we don't have it in this batch we try to get it from the DB
-      nsSummary = reconNamespaceSummaryManager.getNSSummary(parentObjectId);
-    }
-    if (nsSummary == null) {
-      // If we don't have it locally and in the DB we create a new instance
-      // as this is a new ID
-      nsSummary = new NSSummary();
-    }
-    int numOfFile = nsSummary.getNumOfFiles();
-    long sizeOfFile = nsSummary.getSizeOfFiles();
-    int[] fileBucket = nsSummary.getFileSizeBucket();
-    nsSummary.setNumOfFiles(numOfFile + 1);
-    long dataSize = keyInfo.getDataSize();
-    nsSummary.setSizeOfFiles(sizeOfFile + dataSize);
-    int binIndex = ReconUtils.getBinIndex(dataSize);
-
-    ++fileBucket[binIndex];
-    nsSummary.setFileSizeBucket(fileBucket);
-    nsSummaryMap.put(parentObjectId, nsSummary);
-  }
-
-  protected void handlePutDirEvent(OmDirectoryInfo directoryInfo,
-                                 Map<Long, NSSummary> nsSummaryMap)
-          throws IOException {
-    long parentObjectId = directoryInfo.getParentObjectID();
-    long objectId = directoryInfo.getObjectID();
-    // write the dir name to the current directory
-    String dirName = directoryInfo.getName();
-    // Try to get the NSSummary from our local map that maps NSSummaries to IDs
-    NSSummary curNSSummary = nsSummaryMap.get(objectId);
-    if (curNSSummary == null) {
-      // If we don't have it in this batch we try to get it from the DB
-      curNSSummary = reconNamespaceSummaryManager.getNSSummary(objectId);
-    }
-    if (curNSSummary == null) {
-      // If we don't have it locally and in the DB we create a new instance
-      // as this is a new ID
-      curNSSummary = new NSSummary();
-    }
-    curNSSummary.setDirName(dirName);
-    nsSummaryMap.put(objectId, curNSSummary);
-
-    // Write the child dir list to the parent directory
-    // Try to get the NSSummary from our local map that maps NSSummaries to IDs
-    NSSummary nsSummary = nsSummaryMap.get(parentObjectId);
-    if (nsSummary == null) {
-      // If we don't have it in this batch we try to get it from the DB
-      nsSummary = reconNamespaceSummaryManager.getNSSummary(parentObjectId);
-    }
-    if (nsSummary == null) {
-      // If we don't have it locally and in the DB we create a new instance
-      // as this is a new ID
-      nsSummary = new NSSummary();
-    }
-    nsSummary.addChildDir(objectId);
-    nsSummaryMap.put(parentObjectId, nsSummary);
+  @Override
+  public String getTaskName() {
+    return "NSSummaryTask";
   }
 
-  protected void handleDeleteKeyEvent(OmKeyInfo keyInfo,
-                                    Map<Long, NSSummary> nsSummaryMap)
-          throws IOException {
-    long parentObjectId = keyInfo.getParentObjectID();
-    // Try to get the NSSummary from our local map that maps NSSummaries to IDs
-    NSSummary nsSummary = nsSummaryMap.get(parentObjectId);
-    if (nsSummary == null) {
-      // If we don't have it in this batch we try to get it from the DB
-      nsSummary = reconNamespaceSummaryManager.getNSSummary(parentObjectId);
+  @Override
+  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+    boolean success;
+    success = nsSummaryTaskWithFSO.processWithFSO(events);
+    if (success) {
+      success = nsSummaryTaskWithLegacy.processWithLegacy(events);
+    } else {
+      LOG.error("processWithFSO failed.");
     }
-
-    // Just in case the OmKeyInfo isn't correctly written.
-    if (nsSummary == null) {
-      LOG.error("The namespace table is not correctly populated.");
-      return;
-    }
-    int numOfFile = nsSummary.getNumOfFiles();
-    long sizeOfFile = nsSummary.getSizeOfFiles();
-    int[] fileBucket = nsSummary.getFileSizeBucket();
-
-    long dataSize = keyInfo.getDataSize();
-    int binIndex = ReconUtils.getBinIndex(dataSize);
-
-    // decrement count, data size, and bucket count
-    // even if there's no direct key, we still keep the entry because
-    // we still need children dir IDs info
-    nsSummary.setNumOfFiles(numOfFile - 1);
-    nsSummary.setSizeOfFiles(sizeOfFile - dataSize);
-    --fileBucket[binIndex];
-    nsSummary.setFileSizeBucket(fileBucket);
-    nsSummaryMap.put(parentObjectId, nsSummary);
+    return new ImmutablePair<>(getTaskName(), success);
   }
 
-  protected void handleDeleteDirEvent(OmDirectoryInfo directoryInfo,
-                                    Map<Long, NSSummary> nsSummaryMap)
-          throws IOException {
-    long parentObjectId = directoryInfo.getParentObjectID();
-    long objectId = directoryInfo.getObjectID();
-    // Try to get the NSSummary from our local map that maps NSSummaries to IDs
-    NSSummary nsSummary = nsSummaryMap.get(parentObjectId);
-    if (nsSummary == null) {
-      // If we don't have it in this batch we try to get it from the DB
-      nsSummary = reconNamespaceSummaryManager.getNSSummary(parentObjectId);
+  @Override
+  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+    Collection<Callable<Boolean>> tasks = new ArrayList<>();
+
+    try {
+      // reinit Recon RocksDB's namespace CF.
+      reconNamespaceSummaryManager.clearNSSummaryTable();
+    } catch (IOException ioEx) {
+      LOG.error("Unable to clear NSSummary table in Recon DB. ",
+          ioEx);
+      return new ImmutablePair<>(getTaskName(), false);
     }
 
-    // Just in case the OmDirectoryInfo isn't correctly written.
-    if (nsSummary == null) {
-      LOG.error("The namespace table is not correctly populated.");
-      return;
+    tasks.add(() -> nsSummaryTaskWithFSO
+        .reprocessWithFSO(omMetadataManager));
+    tasks.add(() -> nsSummaryTaskWithLegacy
+        .reprocessWithLegacy(reconOMMetadataManager));
+
+    List<Future<Boolean>> results;
+    ExecutorService executorService = Executors
+        .newFixedThreadPool(2);
+    try {
+      results = executorService.invokeAll(tasks);
+      for (int i = 0; i < results.size(); i++) {
+        if (results.get(i).get().equals(false)) {
+          return new ImmutablePair<>(getTaskName(), false);
+        }
+      }
+    } catch (InterruptedException ex) {
+      LOG.error("Error while reprocessing NSSummary " +
+          "table in Recon DB. ", ex);
+      return new ImmutablePair<>(getTaskName(), false);
+    } catch (ExecutionException ex2) {
+      LOG.error("Error while reprocessing NSSummary " +
+          "table in Recon DB. ", ex2);
+      return new ImmutablePair<>(getTaskName(), false);
+    } finally {
+      executorService.shutdown();
     }
-
-    nsSummary.removeChildDir(objectId);
-    nsSummaryMap.put(parentObjectId, nsSummary);
+    return new ImmutablePair<>(getTaskName(), true);
   }
 }
 
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java
similarity index 79%
copy from hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java
copy to hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java
index 7baeefdbe4..4cadbf273a 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java
@@ -17,59 +17,48 @@
  */
 
 package org.apache.hadoop.ozone.recon.tasks;
-import org.apache.commons.lang3.tuple.Pair;
+
 import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.apache.hadoop.ozone.recon.api.types.NSSummary;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
 import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.inject.Inject;
 import java.io.IOException;
 import java.util.Map;
 
 /**
- * Task to query data from OMDB and write into Recon RocksDB.
- * Reprocess() will take a snapshots on OMDB, and iterate the keyTable and
- * dirTable to write all information to RocksDB.
- *
- * For FSO-enabled keyTable (fileTable), we need to fetch the parent object
- * (bucket or directory), increment its numOfKeys by 1, increase its sizeOfKeys
- * by the file data size, and update the file size distribution bin accordingly.
- *
- * For dirTable, we need to fetch the parent object (bucket or directory),
- * add the current directory's objectID to the parent object's childDir field.
- *
- * Process() will write all OMDB updates to RocksDB.
- * The write logic is the same as above. For update action, we will treat it as
- * delete old value first, and write updated value then.
+ * Class for holding all NSSummaryTask methods
+ * related to DB operations so that they can commonly be
+ * used in NSSummaryTaskWithFSO and NSSummaryTaskWithLegacy.
  */
-public abstract class NSSummaryTask implements ReconOmTask {
+public class NSSummaryTaskDbEventHandler {
+
   private static final Logger LOG =
-          LoggerFactory.getLogger(NSSummaryTask.class);
+      LoggerFactory.getLogger(NSSummaryTaskDbEventHandler.class);
 
-  private final ReconNamespaceSummaryManager reconNamespaceSummaryManager;
+  private ReconNamespaceSummaryManager reconNamespaceSummaryManager;
+  private ReconOMMetadataManager reconOMMetadataManager;
 
-  @Inject
-  public NSSummaryTask(ReconNamespaceSummaryManager
-                                 reconNamespaceSummaryManager) {
+  public NSSummaryTaskDbEventHandler(ReconNamespaceSummaryManager
+                                     reconNamespaceSummaryManager,
+                                     ReconOMMetadataManager
+                                     reconOMMetadataManager) {
     this.reconNamespaceSummaryManager = reconNamespaceSummaryManager;
+    this.reconOMMetadataManager = reconOMMetadataManager;
   }
 
   public ReconNamespaceSummaryManager getReconNamespaceSummaryManager() {
     return reconNamespaceSummaryManager;
   }
 
-  public abstract String getTaskName();
-
-  public abstract Pair<String, Boolean> process(OMUpdateEventBatch events);
-
-  public abstract Pair<String, Boolean> reprocess(
-      OMMetadataManager omMetadataManager);
+  public ReconOMMetadataManager getReconOMMetadataManager() {
+    return reconOMMetadataManager;
+  }
 
   protected void writeNSSummariesToDB(Map<Long, NSSummary> nsSummaryMap)
       throws IOException {
@@ -115,8 +104,8 @@ public abstract class NSSummaryTask implements ReconOmTask {
   }
 
   protected void handlePutDirEvent(OmDirectoryInfo directoryInfo,
-                                 Map<Long, NSSummary> nsSummaryMap)
-          throws IOException {
+                                   Map<Long, NSSummary> nsSummaryMap)
+      throws IOException {
     long parentObjectId = directoryInfo.getParentObjectID();
     long objectId = directoryInfo.getObjectID();
     // write the dir name to the current directory
@@ -152,8 +141,8 @@ public abstract class NSSummaryTask implements ReconOmTask {
   }
 
   protected void handleDeleteKeyEvent(OmKeyInfo keyInfo,
-                                    Map<Long, NSSummary> nsSummaryMap)
-          throws IOException {
+                                      Map<Long, NSSummary> nsSummaryMap)
+      throws IOException {
     long parentObjectId = keyInfo.getParentObjectID();
     // Try to get the NSSummary from our local map that maps NSSummaries to IDs
     NSSummary nsSummary = nsSummaryMap.get(parentObjectId);
@@ -185,8 +174,8 @@ public abstract class NSSummaryTask implements ReconOmTask {
   }
 
   protected void handleDeleteDirEvent(OmDirectoryInfo directoryInfo,
-                                    Map<Long, NSSummary> nsSummaryMap)
-          throws IOException {
+                                      Map<Long, NSSummary> nsSummaryMap)
+      throws IOException {
     long parentObjectId = directoryInfo.getParentObjectID();
     long objectId = directoryInfo.getObjectID();
     // Try to get the NSSummary from our local map that maps NSSummaries to IDs
@@ -206,4 +195,3 @@ public abstract class NSSummaryTask implements ReconOmTask {
     nsSummaryMap.put(parentObjectId, nsSummary);
   }
 }
-
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java
index 1b8a0ce5a5..0f80927d83 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java
@@ -17,8 +17,6 @@
  */
 
 package org.apache.hadoop.ozone.recon.tasks;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -26,11 +24,11 @@ import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
 import org.apache.hadoop.ozone.recon.api.types.NSSummary;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
 import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.inject.Inject;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -44,20 +42,16 @@ import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
 /**
  * Class for handling FSO specific tasks.
  */
-public class NSSummaryTaskWithFSO extends NSSummaryTask {
+public class NSSummaryTaskWithFSO extends NSSummaryTaskDbEventHandler {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(NSSummaryTaskWithFSO.class);
 
-  @Inject
   public NSSummaryTaskWithFSO(ReconNamespaceSummaryManager
-                            reconNamespaceSummaryManager) {
-    super(reconNamespaceSummaryManager);
-  }
-
-  @Override
-  public String getTaskName() {
-    return "NSSummaryTaskWithFSO";
+                              reconNamespaceSummaryManager,
+                              ReconOMMetadataManager
+                              reconOMMetadataManager) {
+    super(reconNamespaceSummaryManager, reconOMMetadataManager);
   }
 
   // We only listen to updates from FSO-enabled KeyTable(FileTable) and DirTable
@@ -65,8 +59,7 @@ public class NSSummaryTaskWithFSO extends NSSummaryTask {
     return Arrays.asList(FILE_TABLE, DIRECTORY_TABLE);
   }
 
-  @Override
-  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+  public boolean processWithFSO(OMUpdateEventBatch events) {
     Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
     final Collection<String> taskTables = getTaskTables();
     Map<Long, NSSummary> nsSummaryMap = new HashMap<>();
@@ -152,7 +145,7 @@ public class NSSummaryTaskWithFSO extends NSSummaryTask {
       } catch (IOException ioEx) {
         LOG.error("Unable to process Namespace Summary data in Recon DB. ",
                 ioEx);
-        return new ImmutablePair<>(getTaskName(), false);
+        return false;
       }
     }
 
@@ -160,21 +153,17 @@ public class NSSummaryTaskWithFSO extends NSSummaryTask {
       writeNSSummariesToDB(nsSummaryMap);
     } catch (IOException e) {
       LOG.error("Unable to write Namespace Summary data in Recon DB.", e);
-      return new ImmutablePair<>(getTaskName(), false);
+      return false;
     }
 
     LOG.info("Completed a process run of NSSummaryTaskWithFSO");
-    return new ImmutablePair<>(getTaskName(), true);
+    return true;
   }
 
-  @Override
-  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+  public boolean reprocessWithFSO(OMMetadataManager omMetadataManager) {
     Map<Long, NSSummary> nsSummaryMap = new HashMap<>();
 
     try {
-      // reinit Recon RocksDB's namespace CF.
-      getReconNamespaceSummaryManager().clearNSSummaryTable();
-
       Table<String, OmDirectoryInfo> dirTable =
           omMetadataManager.getDirectoryTable();
       try (TableIterator<String,
@@ -188,7 +177,8 @@ public class NSSummaryTaskWithFSO extends NSSummaryTask {
       }
 
       // Get fileTable used by FSO
-      Table<String, OmKeyInfo> keyTable = omMetadataManager.getFileTable();
+      Table<String, OmKeyInfo> keyTable =
+          omMetadataManager.getFileTable();
 
       try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
               keyTableIter = keyTable.iterator()) {
@@ -202,16 +192,16 @@ public class NSSummaryTaskWithFSO extends NSSummaryTask {
     } catch (IOException ioEx) {
       LOG.error("Unable to reprocess Namespace Summary data in Recon DB. ",
               ioEx);
-      return new ImmutablePair<>(getTaskName(), false);
+      return false;
     }
 
     try {
       writeNSSummariesToDB(nsSummaryMap);
     } catch (IOException e) {
       LOG.error("Unable to write Namespace Summary data in Recon DB.", e);
-      return new ImmutablePair<>(getTaskName(), false);
+      return false;
     }
     LOG.info("Completed a reprocess run of NSSummaryTaskWithFSO");
-    return new ImmutablePair<>(getTaskName(), true);
+    return true;
   }
 }
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java
new file mode 100644
index 0000000000..6e414a3b4e
--- /dev/null
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
+import org.apache.hadoop.ozone.recon.api.types.NSSummary;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+
+/**
+ * Class for handling Legacy specific tasks.
+ */
+public class NSSummaryTaskWithLegacy extends NSSummaryTaskDbEventHandler {
+
+  private static final BucketLayout BUCKET_LAYOUT = BucketLayout.LEGACY;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(NSSummaryTaskWithLegacy.class);
+
+  private boolean enableFileSystemPaths;
+
+  public NSSummaryTaskWithLegacy(ReconNamespaceSummaryManager
+                                 reconNamespaceSummaryManager,
+                                 ReconOMMetadataManager
+                                 reconOMMetadataManager,
+                                 OzoneConfiguration
+                                 ozoneConfiguration) {
+    super(reconNamespaceSummaryManager, reconOMMetadataManager);
+    // true if FileSystemPaths enabled
+    enableFileSystemPaths = ozoneConfiguration
+        .getBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
+            OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS_DEFAULT);
+  }
+
+  public boolean processWithLegacy(OMUpdateEventBatch events) {
+    Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+    Map<Long, NSSummary> nsSummaryMap = new HashMap<>();
+
+    while (eventIterator.hasNext()) {
+      OMDBUpdateEvent<String, ? extends
+          WithParentObjectId> omdbUpdateEvent = eventIterator.next();
+      OMDBUpdateEvent.OMDBUpdateAction action = omdbUpdateEvent.getAction();
+
+      // we only process updates on OM's KeyTable
+      String table = omdbUpdateEvent.getTable();
+      boolean updateOnKeyTable = table.equals(KEY_TABLE);
+      if (!updateOnKeyTable) {
+        continue;
+      }
+
+      String updatedKey = omdbUpdateEvent.getKey();
+
+      try {
+        OMDBUpdateEvent<String, OmKeyInfo> keyTableUpdateEvent =
+            (OMDBUpdateEvent<String, OmKeyInfo>) omdbUpdateEvent;
+        OmKeyInfo updatedKeyInfo = keyTableUpdateEvent.getValue();
+        OmKeyInfo oldKeyInfo = keyTableUpdateEvent.getOldValue();
+
+        // KeyTable entries belong to both Legacy and OBS buckets.
+        // Check bucket layout and if it's OBS
+        // continue to the next iteration.
+        // Check just for the current KeyInfo.
+        String volumeName = updatedKeyInfo.getVolumeName();
+        String bucketName = updatedKeyInfo.getBucketName();
+        String bucketDBKey = getReconOMMetadataManager()
+            .getBucketKey(volumeName, bucketName);
+        // Get bucket info from bucket table
+        OmBucketInfo omBucketInfo = getReconOMMetadataManager()
+            .getBucketTable().getSkipCache(bucketDBKey);
+
+        if (omBucketInfo.getBucketLayout()
+            .isObjectStore(enableFileSystemPaths)) {
+          continue;
+        }
+
+        setKeyParentID(updatedKeyInfo);
+
+        if (!updatedKeyInfo.getKeyName().endsWith(OM_KEY_PREFIX)) {
+          switch (action) {
+          case PUT:
+            handlePutKeyEvent(updatedKeyInfo, nsSummaryMap);
+            break;
+
+          case DELETE:
+            handleDeleteKeyEvent(updatedKeyInfo, nsSummaryMap);
+            break;
+
+          case UPDATE:
+            if (oldKeyInfo != null) {
+              // delete first, then put
+              setKeyParentID(oldKeyInfo);
+              handleDeleteKeyEvent(oldKeyInfo, nsSummaryMap);
+            } else {
+              LOG.warn("Update event does not have the old keyInfo for {}.",
+                  updatedKey);
+            }
+            handlePutKeyEvent(updatedKeyInfo, nsSummaryMap);
+            break;
+
+          default:
+            LOG.debug("Skipping DB update event : {}",
+                omdbUpdateEvent.getAction());
+          }
+        } else {
+          OmDirectoryInfo updatedDirectoryInfo =
+              new OmDirectoryInfo.Builder()
+                  .setName(updatedKeyInfo.getKeyName())
+                  .setObjectID(updatedKeyInfo.getObjectID())
+                  .setParentObjectID(updatedKeyInfo.getParentObjectID())
+                  .build();
+
+          OmDirectoryInfo oldDirectoryInfo = null;
+
+          if (oldKeyInfo != null) {
+            oldDirectoryInfo =
+                new OmDirectoryInfo.Builder()
+                    .setName(oldKeyInfo.getKeyName())
+                    .setObjectID(oldKeyInfo.getObjectID())
+                    .setParentObjectID(oldKeyInfo.getParentObjectID())
+                    .build();
+          }
+
+          switch (action) {
+          case PUT:
+            handlePutDirEvent(updatedDirectoryInfo, nsSummaryMap);
+            break;
+
+          case DELETE:
+            handleDeleteDirEvent(updatedDirectoryInfo, nsSummaryMap);
+            break;
+
+          case UPDATE:
+            if (oldDirectoryInfo != null) {
+              // delete first, then put
+              handleDeleteDirEvent(oldDirectoryInfo, nsSummaryMap);
+            } else {
+              LOG.warn("Update event does not have the old dirInfo for {}.",
+                  updatedKey);
+            }
+            handlePutDirEvent(updatedDirectoryInfo, nsSummaryMap);
+            break;
+
+          default:
+            LOG.debug("Skipping DB update event : {}",
+                omdbUpdateEvent.getAction());
+          }
+        }
+      } catch (IOException ioEx) {
+        LOG.error("Unable to process Namespace Summary data in Recon DB. ",
+            ioEx);
+        return false;
+      }
+    }
+
+    try {
+      writeNSSummariesToDB(nsSummaryMap);
+    } catch (IOException e) {
+      LOG.error("Unable to write Namespace Summary data in Recon DB.", e);
+      return false;
+    }
+
+    LOG.info("Completed a process run of NSSummaryTaskWithLegacy");
+    return true;
+  }
+
+  public boolean reprocessWithLegacy(OMMetadataManager omMetadataManager) {
+    Map<Long, NSSummary> nsSummaryMap = new HashMap<>();
+
+    try {
+      Table<String, OmKeyInfo> keyTable =
+          omMetadataManager.getKeyTable(BUCKET_LAYOUT);
+
+      try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+          keyTableIter = keyTable.iterator()) {
+
+        while (keyTableIter.hasNext()) {
+          Table.KeyValue<String, OmKeyInfo> kv = keyTableIter.next();
+          OmKeyInfo keyInfo = kv.getValue();
+
+          // KeyTable entries belong to both Legacy and OBS buckets.
+          // Check bucket layout and if it's OBS
+          // continue to the next iteration.
+          String volumeName = keyInfo.getVolumeName();
+          String bucketName = keyInfo.getBucketName();
+          String bucketDBKey = omMetadataManager
+              .getBucketKey(volumeName, bucketName);
+          // Get bucket info from bucket table
+          OmBucketInfo omBucketInfo = omMetadataManager
+              .getBucketTable().getSkipCache(bucketDBKey);
+
+          if (omBucketInfo.getBucketLayout()
+              .isObjectStore(enableFileSystemPaths)) {
+            continue;
+          }
+
+          setKeyParentID(keyInfo);
+
+          if (keyInfo.getKeyName().endsWith(OM_KEY_PREFIX)) {
+            OmDirectoryInfo directoryInfo =
+                new OmDirectoryInfo.Builder()
+                    .setName(keyInfo.getKeyName())
+                    .setObjectID(keyInfo.getObjectID())
+                    .setParentObjectID(keyInfo.getParentObjectID())
+                    .build();
+            handlePutDirEvent(directoryInfo, nsSummaryMap);
+          } else {
+            handlePutKeyEvent(keyInfo, nsSummaryMap);
+          }
+        }
+      }
+    } catch (IOException ioEx) {
+      LOG.error("Unable to reprocess Namespace Summary data in Recon DB. ",
+          ioEx);
+      return false;
+    }
+
+    try {
+      writeNSSummariesToDB(nsSummaryMap);
+    } catch (IOException e) {
+      LOG.error("Unable to write Namespace Summary data in Recon DB.", e);
+      return false;
+    }
+    LOG.info("Completed a reprocess run of NSSummaryTaskWithLegacy");
+    return true;
+  }
+
+  /**
+   * KeyTable entries don't have the parentId set.
+   * In order to reuse the existing FSO methods that rely on
+   * the parentId, we have to set it explicitly.
+   * @param keyInfo
+   * @throws IOException
+   */
+  private void setKeyParentID(OmKeyInfo keyInfo) throws IOException {
+    String[] keyPath = keyInfo.getKeyName().split(OM_KEY_PREFIX);
+
+    // If the path contains only one key then keyPath.length
+    // will be 1 and the parent will be a bucket.
+    // If the keyPath.length is greater than 1 then
+    // there is at least one directory.
+    if (keyPath.length > 1) {
+      String[] dirs = Arrays.copyOf(keyPath, keyPath.length - 1);
+      String parentKeyName = String.join(OM_KEY_PREFIX, dirs);
+      parentKeyName += OM_KEY_PREFIX;
+      String fullParentKeyName =
+          getReconOMMetadataManager().getOzoneKey(keyInfo.getVolumeName(),
+              keyInfo.getBucketName(), parentKeyName);
+      OmKeyInfo parentKeyInfo = getReconOMMetadataManager()
+          .getKeyTable(BUCKET_LAYOUT)
+          .getSkipCache(fullParentKeyName);
+
+      if (parentKeyInfo != null) {
+        keyInfo.setParentObjectID(parentKeyInfo.getObjectID());
+      } else {
+        throw new IOException("ParentKeyInfo for " +
+            "NSSummaryTaskWithLegacy is null");
+      }
+    } else {
+      String bucketKey = getReconOMMetadataManager()
+          .getBucketKey(keyInfo.getVolumeName(), keyInfo.getBucketName());
+      OmBucketInfo parentBucketInfo =
+          getReconOMMetadataManager().getBucketTable().getSkipCache(bucketKey);
+
+      if (parentBucketInfo != null) {
+        keyInfo.setParentObjectID(parentBucketInfo.getObjectID());
+      } else {
+        throw new IOException("ParentKeyInfo for " +
+            "NSSummaryTaskWithLegacy is null");
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
index 8be665a1df..ee51c318b8 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
@@ -183,11 +183,11 @@ public final class OMMetadataManagerTestUtils {
             .build());
   }
 
-  @SuppressWarnings("checkstyle:parameternumber")
   /**
    * Write a key on OM instance.
    * @throw IOException while writing.
    */
+  @SuppressWarnings("checkstyle:parameternumber")
   public static void writeKeyToOm(OMMetadataManager omMetadataManager,
                                     String key,
                                     String bucket,
@@ -256,6 +256,28 @@ public final class OMMetadataManagerTestUtils {
                     .build());
   }
 
+  /**
+   * Write a directory as key on OM instance.
+   * We don't need to set size.
+   * @throws IOException
+   */
+  @SuppressWarnings("checkstyle:parameternumber")
+  public static void writeDirToOm(OMMetadataManager omMetadataManager,
+                                  String key,
+                                  String bucket,
+                                  String volume,
+                                  String fileName,
+                                  long objectID,
+                                  long parentObjectId,
+                                  long bucketObjectId,
+                                  long volumeObjectId,
+                                  BucketLayout bucketLayout)
+      throws IOException {
+    writeKeyToOm(omMetadataManager, key, bucket, volume,
+        fileName, objectID, parentObjectId, bucketObjectId,
+        volumeObjectId, 0, bucketLayout);
+  }
+
   public static void writeDirToOm(OMMetadataManager omMetadataManager,
                                   long objectId,
                                   long parentObjectId,
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java
index 20e423e896..c68bab8735 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java
@@ -1,4 +1,4 @@
-/*'
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -367,8 +367,9 @@ public class TestNSSummaryEndpointWithFSO {
     // populate OM DB and reprocess into Recon RocksDB
     populateOMDB();
     NSSummaryTaskWithFSO nSSummaryTaskWithFso =
-        new NSSummaryTaskWithFSO(reconNamespaceSummaryManager);
-    nSSummaryTaskWithFso.reprocess(reconOMMetadataManager);
+        new NSSummaryTaskWithFSO(reconNamespaceSummaryManager,
+            reconOMMetadataManager);
+    nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
   }
 
   @Test
@@ -841,7 +842,6 @@ public class TestNSSummaryEndpointWithFSO {
           getBucketLayout());
   }
 
-
   /**
    * Create a new OM Metadata manager instance with one user, one vol, and two
    * buckets.
@@ -1246,4 +1246,4 @@ public class TestNSSummaryEndpointWithFSO {
   private static BucketLayout getBucketLayout() {
     return BucketLayout.FILE_SYSTEM_OPTIMIZED;
   }
-}
+}
\ No newline at end of file
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java
similarity index 88%
copy from hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java
copy to hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java
index 20e423e896..ccbdd36195 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java
@@ -1,4 +1,4 @@
-/*'
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -51,7 +52,7 @@ import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
-import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithFSO;
+import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithLegacy;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -71,15 +72,16 @@ import java.util.HashSet;
 
 import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
-import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDirToOm;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeKeyToOm;
+import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDirToOm;
+import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProvider;
 import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
-import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProviderWithFSO;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * Test for NSSummary REST APIs with FSO.
+ * Test for NSSummary REST APIs with Legacy.
  * We tested on a mini file system with the following setting:
  *                vol
  *             /       \
@@ -101,21 +103,22 @@ import static org.mockito.Mockito.when;
  * This is a test for the Rest APIs only. We have tested NSSummaryTask before,
  * so there is no need to test process() on DB's updates
  */
-public class TestNSSummaryEndpointWithFSO {
+public class TestNSSummaryEndpointWithLegacy {
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
   private ReconOMMetadataManager reconOMMetadataManager;
   private NSSummaryEndpoint nsSummaryEndpoint;
+  private OzoneConfiguration conf;
 
   private static final String TEST_PATH_UTILITY =
-          "/vol1/buck1/a/b/c/d/e/file1.txt";
+      "/vol1/buck1/a/b/c/d/e/file1.txt";
   private static final String PARENT_DIR = "vol1/buck1/a/b/c/d/e";
   private static final String[] TEST_NAMES =
-          new String[]{"vol1", "buck1", "a", "b", "c", "d", "e", "file1.txt"};
+      new String[]{"vol1", "buck1", "a", "b", "c", "d", "e", "file1.txt"};
   private static final String TEST_KEY_NAMES = "a/b/c/d/e/file1.txt";
 
-  // Object names in FSO-enabled format
+  // Object names
   private static final String VOL = "vol";
   private static final String VOL_TWO = "vol2";
   private static final String BUCKET_ONE = "bucket1";
@@ -154,6 +157,7 @@ public class TestNSSummaryEndpointWithFSO {
   private static final String DIR_FOUR = "dir4";
   private static final String DIR_FIVE = "dir5";
   // objects IDs
+  private static final long PARENT_OBJECT_ID_ZERO = 0L;
   private static final long VOL_OBJECT_ID = 0L;
   private static final long BUCKET_ONE_OBJECT_ID = 1L;
   private static final long BUCKET_TWO_OBJECT_ID = 2L;
@@ -325,50 +329,52 @@ public class TestNSSummaryEndpointWithFSO {
       KEY_THREE_SIZE + KEY_FOUR_SIZE + KEY_FIVE_SIZE + KEY_SIX_SIZE +
       KEY_EIGHT_SIZE + KEY_NINE_SIZE + KEY_TEN_SIZE + KEY_ELEVEN_SIZE;
   private static final long VOL_DATA_SIZE = KEY_ONE_SIZE + KEY_TWO_SIZE +
-          KEY_THREE_SIZE + KEY_FOUR_SIZE + KEY_FIVE_SIZE + KEY_SIX_SIZE;
+      KEY_THREE_SIZE + KEY_FOUR_SIZE + KEY_FIVE_SIZE + KEY_SIX_SIZE;
 
   private static final long VOL_TWO_DATA_SIZE =
       KEY_EIGHT_SIZE + KEY_NINE_SIZE + KEY_TEN_SIZE + KEY_ELEVEN_SIZE;
 
   private static final long BUCKET_ONE_DATA_SIZE = KEY_ONE_SIZE + KEY_TWO_SIZE +
-          KEY_THREE_SIZE + KEY_SIX_SIZE;
+      KEY_THREE_SIZE + KEY_SIX_SIZE;
 
   private static final long BUCKET_TWO_DATA_SIZE =
-          KEY_FOUR_SIZE + KEY_FIVE_SIZE;
+      KEY_FOUR_SIZE + KEY_FIVE_SIZE;
 
   private static final long DIR_ONE_DATA_SIZE = KEY_TWO_SIZE +
-          KEY_THREE_SIZE + KEY_SIX_SIZE;
+      KEY_THREE_SIZE + KEY_SIX_SIZE;
 
   @Before
   public void setUp() throws Exception {
+    conf = new OzoneConfiguration();
     OMMetadataManager omMetadataManager = initializeNewOmMetadataManager(
-        temporaryFolder.newFolder());
+        temporaryFolder.newFolder(), conf);
     OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
-        getMockOzoneManagerServiceProviderWithFSO();
-    reconOMMetadataManager = getTestReconOmMetadataManager(omMetadataManager,
-            temporaryFolder.newFolder());
+        getMockOzoneManagerServiceProvider();
+    reconOMMetadataManager = getTestReconOmMetadataManager(omMetadataManager, 
+        temporaryFolder.newFolder());
 
     ReconTestInjector reconTestInjector =
-            new ReconTestInjector.Builder(temporaryFolder)
-                    .withReconOm(reconOMMetadataManager)
-                    .withOmServiceProvider(ozoneManagerServiceProvider)
-                    .withReconSqlDb()
-                    .withContainerDB()
-                    .addBinding(OzoneStorageContainerManager.class,
-                            getMockReconSCM())
-                    .addBinding(StorageContainerServiceProvider.class,
-                            mock(StorageContainerServiceProviderImpl.class))
-                    .addBinding(NSSummaryEndpoint.class)
-                    .build();
+        new ReconTestInjector.Builder(temporaryFolder)
+            .withReconOm(reconOMMetadataManager)
+            .withOmServiceProvider(ozoneManagerServiceProvider)
+            .withReconSqlDb()
+            .withContainerDB()
+            .addBinding(OzoneStorageContainerManager.class,
+                getMockReconSCM())
+            .addBinding(StorageContainerServiceProvider.class,
+                mock(StorageContainerServiceProviderImpl.class))
+            .addBinding(NSSummaryEndpoint.class)
+            .build();
     ReconNamespaceSummaryManager reconNamespaceSummaryManager =
         reconTestInjector.getInstance(ReconNamespaceSummaryManager.class);
     nsSummaryEndpoint = reconTestInjector.getInstance(NSSummaryEndpoint.class);
 
     // populate OM DB and reprocess into Recon RocksDB
     populateOMDB();
-    NSSummaryTaskWithFSO nSSummaryTaskWithFso =
-        new NSSummaryTaskWithFSO(reconNamespaceSummaryManager);
-    nSSummaryTaskWithFso.reprocess(reconOMMetadataManager);
+    NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy = 
+        new NSSummaryTaskWithLegacy(reconNamespaceSummaryManager, 
+                                    reconOMMetadataManager, conf);
+    nsSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
   }
 
   @Test
@@ -399,7 +405,7 @@ public class TestNSSummaryEndpointWithFSO {
     // Test volume basics
     Response volResponse = nsSummaryEndpoint.getBasicInfo(VOL_PATH);
     NamespaceSummaryResponse volResponseObj =
-            (NamespaceSummaryResponse) volResponse.getEntity();
+        (NamespaceSummaryResponse) volResponse.getEntity();
     Assert.assertEquals(EntityType.VOLUME, volResponseObj.getEntityType());
     Assert.assertEquals(2, volResponseObj.getNumBucket());
     Assert.assertEquals(4, volResponseObj.getNumTotalDir());
@@ -410,9 +416,9 @@ public class TestNSSummaryEndpointWithFSO {
   public void testGetBasicInfoBucketOne() throws Exception {
     // Test bucket 1's basics
     Response bucketOneResponse =
-            nsSummaryEndpoint.getBasicInfo(BUCKET_ONE_PATH);
+        nsSummaryEndpoint.getBasicInfo(BUCKET_ONE_PATH);
     NamespaceSummaryResponse bucketOneObj =
-            (NamespaceSummaryResponse) bucketOneResponse.getEntity();
+        (NamespaceSummaryResponse) bucketOneResponse.getEntity();
     Assert.assertEquals(EntityType.BUCKET, bucketOneObj.getEntityType());
     Assert.assertEquals(4, bucketOneObj.getNumTotalDir());
     Assert.assertEquals(4, bucketOneObj.getNumTotalKey());
@@ -422,9 +428,9 @@ public class TestNSSummaryEndpointWithFSO {
   public void testGetBasicInfoBucketTwo() throws Exception {
     // Test bucket 2's basics
     Response bucketTwoResponse =
-            nsSummaryEndpoint.getBasicInfo(BUCKET_TWO_PATH);
+        nsSummaryEndpoint.getBasicInfo(BUCKET_TWO_PATH);
     NamespaceSummaryResponse bucketTwoObj =
-            (NamespaceSummaryResponse) bucketTwoResponse.getEntity();
+        (NamespaceSummaryResponse) bucketTwoResponse.getEntity();
     Assert.assertEquals(EntityType.BUCKET, bucketTwoObj.getEntityType());
     Assert.assertEquals(0, bucketTwoObj.getNumTotalDir());
     Assert.assertEquals(2, bucketTwoObj.getNumTotalKey());
@@ -435,7 +441,7 @@ public class TestNSSummaryEndpointWithFSO {
     // Test intermediate directory basics
     Response dirOneResponse = nsSummaryEndpoint.getBasicInfo(DIR_ONE_PATH);
     NamespaceSummaryResponse dirOneObj =
-            (NamespaceSummaryResponse) dirOneResponse.getEntity();
+        (NamespaceSummaryResponse) dirOneResponse.getEntity();
     Assert.assertEquals(EntityType.DIRECTORY, dirOneObj.getEntityType());
     Assert.assertEquals(3, dirOneObj.getNumTotalDir());
     Assert.assertEquals(3, dirOneObj.getNumTotalKey());
@@ -446,7 +452,7 @@ public class TestNSSummaryEndpointWithFSO {
     // Test invalid path
     Response invalidResponse = nsSummaryEndpoint.getBasicInfo(INVALID_PATH);
     NamespaceSummaryResponse invalidObj =
-            (NamespaceSummaryResponse) invalidResponse.getEntity();
+        (NamespaceSummaryResponse) invalidResponse.getEntity();
     Assert.assertEquals(ResponseStatus.PATH_NOT_FOUND,
         invalidObj.getStatus());
   }
@@ -456,7 +462,7 @@ public class TestNSSummaryEndpointWithFSO {
     // Test key
     Response keyResponse = nsSummaryEndpoint.getBasicInfo(KEY_PATH);
     NamespaceSummaryResponse keyResObj =
-            (NamespaceSummaryResponse) keyResponse.getEntity();
+        (NamespaceSummaryResponse) keyResponse.getEntity();
     Assert.assertEquals(EntityType.KEY, keyResObj.getEntityType());
   }
 
@@ -478,47 +484,48 @@ public class TestNSSummaryEndpointWithFSO {
     Assert.assertEquals(VOL_DATA_SIZE, duVol1.getSize());
     Assert.assertEquals(VOL_TWO_DATA_SIZE, duVol2.getSize());
   }
+
   @Test
   public void testDiskUsageVolume() throws Exception {
     // volume level DU
     Response volResponse = nsSummaryEndpoint.getDiskUsage(VOL_PATH,
-            false, false);
+        false, false);
     DUResponse duVolRes = (DUResponse) volResponse.getEntity();
     Assert.assertEquals(2, duVolRes.getCount());
     List<DUResponse.DiskUsage> duData = duVolRes.getDuData();
     // sort based on subpath
     Collections.sort(duData,
-            Comparator.comparing(DUResponse.DiskUsage::getSubpath));
+        Comparator.comparing(DUResponse.DiskUsage::getSubpath));
     DUResponse.DiskUsage duBucket1 = duData.get(0);
     DUResponse.DiskUsage duBucket2 = duData.get(1);
     Assert.assertEquals(BUCKET_ONE_PATH, duBucket1.getSubpath());
     Assert.assertEquals(BUCKET_TWO_PATH, duBucket2.getSubpath());
     Assert.assertEquals(BUCKET_ONE_DATA_SIZE, duBucket1.getSize());
     Assert.assertEquals(BUCKET_TWO_DATA_SIZE, duBucket2.getSize());
-
   }
+
   @Test
   public void testDiskUsageBucket() throws Exception {
     // bucket level DU
     Response bucketResponse = nsSummaryEndpoint.getDiskUsage(BUCKET_ONE_PATH,
-            false, false);
+        false, false);
     DUResponse duBucketResponse = (DUResponse) bucketResponse.getEntity();
     Assert.assertEquals(1, duBucketResponse.getCount());
     DUResponse.DiskUsage duDir1 = duBucketResponse.getDuData().get(0);
     Assert.assertEquals(DIR_ONE_PATH, duDir1.getSubpath());
     Assert.assertEquals(DIR_ONE_DATA_SIZE, duDir1.getSize());
-
   }
+
   @Test
   public void testDiskUsageDir() throws Exception {
     // dir level DU
     Response dirResponse = nsSummaryEndpoint.getDiskUsage(DIR_ONE_PATH,
-            false, false);
+        false, false);
     DUResponse duDirReponse = (DUResponse) dirResponse.getEntity();
     Assert.assertEquals(3, duDirReponse.getCount());
     List<DUResponse.DiskUsage> duSubDir = duDirReponse.getDuData();
     Collections.sort(duSubDir,
-            Comparator.comparing(DUResponse.DiskUsage::getSubpath));
+        Comparator.comparing(DUResponse.DiskUsage::getSubpath));
     DUResponse.DiskUsage duDir2 = duSubDir.get(0);
     DUResponse.DiskUsage duDir3 = duSubDir.get(1);
     DUResponse.DiskUsage duDir4 = duSubDir.get(2);
@@ -530,37 +537,37 @@ public class TestNSSummaryEndpointWithFSO {
 
     Assert.assertEquals(DIR_FOUR_PATH, duDir4.getSubpath());
     Assert.assertEquals(KEY_SIX_SIZE, duDir4.getSize());
-
   }
+
   @Test
   public void testDiskUsageKey() throws Exception {
     // key level DU
     Response keyResponse = nsSummaryEndpoint.getDiskUsage(KEY_PATH,
-            false, false);
+        false, false);
     DUResponse keyObj = (DUResponse) keyResponse.getEntity();
     Assert.assertEquals(0, keyObj.getCount());
     Assert.assertEquals(KEY_FOUR_SIZE, keyObj.getSize());
-
   }
+
   @Test
   public void testDiskUsageUnknown() throws Exception {
     // invalid path check
     Response invalidResponse = nsSummaryEndpoint.getDiskUsage(INVALID_PATH,
-            false, false);
+        false, false);
     DUResponse invalidObj = (DUResponse) invalidResponse.getEntity();
     Assert.assertEquals(ResponseStatus.PATH_NOT_FOUND,
-            invalidObj.getStatus());
+        invalidObj.getStatus());
   }
 
   @Test
   public void testDiskUsageWithReplication() throws Exception {
     setUpMultiBlockKey();
     Response keyResponse = nsSummaryEndpoint.getDiskUsage(MULTI_BLOCK_KEY_PATH,
-            false, true);
+        false, true);
     DUResponse replicaDUResponse = (DUResponse) keyResponse.getEntity();
     Assert.assertEquals(ResponseStatus.OK, replicaDUResponse.getStatus());
     Assert.assertEquals(MULTI_BLOCK_KEY_SIZE_WITH_REPLICA,
-            replicaDUResponse.getSizeWithReplica());
+        replicaDUResponse.getSizeWithReplica());
   }
 
   @Test
@@ -657,29 +664,29 @@ public class TestNSSummaryEndpointWithFSO {
 
     Response bucketRes2 = nsSummaryEndpoint.getQuotaUsage(BUCKET_TWO_PATH);
     QuotaUsageResponse quBucketRes2 =
-            (QuotaUsageResponse) bucketRes2.getEntity();
+        (QuotaUsageResponse) bucketRes2.getEntity();
     Assert.assertEquals(BUCKET_TWO_QUOTA, quBucketRes2.getQuota());
     Assert.assertEquals(BUCKET_TWO_DATA_SIZE, quBucketRes2.getQuotaUsed());
 
     // other level not applicable
     Response naResponse1 = nsSummaryEndpoint.getQuotaUsage(DIR_ONE_PATH);
     QuotaUsageResponse quotaUsageResponse1 =
-            (QuotaUsageResponse) naResponse1.getEntity();
+        (QuotaUsageResponse) naResponse1.getEntity();
     Assert.assertEquals(ResponseStatus.TYPE_NOT_APPLICABLE,
-            quotaUsageResponse1.getResponseCode());
+        quotaUsageResponse1.getResponseCode());
 
     Response naResponse2 = nsSummaryEndpoint.getQuotaUsage(KEY_PATH);
     QuotaUsageResponse quotaUsageResponse2 =
-            (QuotaUsageResponse) naResponse2.getEntity();
+        (QuotaUsageResponse) naResponse2.getEntity();
     Assert.assertEquals(ResponseStatus.TYPE_NOT_APPLICABLE,
-            quotaUsageResponse2.getResponseCode());
+        quotaUsageResponse2.getResponseCode());
 
     // invalid path request
     Response invalidRes = nsSummaryEndpoint.getQuotaUsage(INVALID_PATH);
     QuotaUsageResponse invalidResObj =
-            (QuotaUsageResponse) invalidRes.getEntity();
+        (QuotaUsageResponse) invalidRes.getEntity();
     Assert.assertEquals(ResponseStatus.PATH_NOT_FOUND,
-            invalidResObj.getResponseCode());
+        invalidResObj.getResponseCode());
   }
 
 
@@ -710,23 +717,59 @@ public class TestNSSummaryEndpointWithFSO {
    * Write directories and keys info into OM DB.
    * @throws Exception
    */
+  @SuppressWarnings("checkstyle:MethodLength")
   private void populateOMDB() throws Exception {
     // write all directories
-    writeDirToOm(reconOMMetadataManager, DIR_ONE_OBJECT_ID,
-            BUCKET_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
-            VOL_OBJECT_ID, DIR_ONE);
-    writeDirToOm(reconOMMetadataManager, DIR_TWO_OBJECT_ID,
-            DIR_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
-            VOL_OBJECT_ID, DIR_TWO);
-    writeDirToOm(reconOMMetadataManager, DIR_THREE_OBJECT_ID,
-            DIR_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
-            VOL_OBJECT_ID, DIR_THREE);
-    writeDirToOm(reconOMMetadataManager, DIR_FOUR_OBJECT_ID,
-            DIR_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
-            VOL_OBJECT_ID, DIR_FOUR);
-    writeDirToOm(reconOMMetadataManager, DIR_FIVE_OBJECT_ID,
-            BUCKET_THREE_OBJECT_ID, BUCKET_THREE_OBJECT_ID,
-            VOL_TWO_OBJECT_ID, DIR_FIVE);
+    writeDirToOm(reconOMMetadataManager,
+          (DIR_ONE + OM_KEY_PREFIX),
+          BUCKET_ONE,
+          VOL,
+          DIR_ONE,
+          DIR_ONE_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
+          BUCKET_ONE_OBJECT_ID,
+          VOL_OBJECT_ID,
+          getBucketLayout());
+    writeDirToOm(reconOMMetadataManager,
+          (DIR_ONE + OM_KEY_PREFIX + DIR_TWO + OM_KEY_PREFIX),
+          BUCKET_ONE,
+          VOL,
+          DIR_TWO,
+          DIR_TWO_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
+          BUCKET_ONE_OBJECT_ID,
+          VOL_OBJECT_ID,
+          getBucketLayout());
+    writeDirToOm(reconOMMetadataManager,
+          (DIR_ONE + OM_KEY_PREFIX + DIR_THREE + OM_KEY_PREFIX),
+          BUCKET_ONE,
+          VOL,
+          DIR_THREE,
+          DIR_THREE_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
+          BUCKET_ONE_OBJECT_ID,
+          VOL_OBJECT_ID,
+          getBucketLayout());
+    writeDirToOm(reconOMMetadataManager,
+          (DIR_ONE + OM_KEY_PREFIX + DIR_FOUR + OM_KEY_PREFIX),
+          BUCKET_ONE,
+          VOL,
+          DIR_FOUR,
+          DIR_FOUR_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
+          BUCKET_ONE_OBJECT_ID,
+          VOL_OBJECT_ID,
+          getBucketLayout());
+    writeDirToOm(reconOMMetadataManager,
+          (DIR_FIVE + OM_KEY_PREFIX),
+          BUCKET_THREE,
+          VOL_TWO,
+          DIR_FIVE,
+          DIR_FIVE_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
+          BUCKET_THREE_OBJECT_ID,
+          VOL_TWO_OBJECT_ID,
+          getBucketLayout());
 
     // write all keys
     writeKeyToOm(reconOMMetadataManager,
@@ -735,7 +778,7 @@ public class TestNSSummaryEndpointWithFSO {
           VOL,
           FILE_ONE,
           KEY_ONE_OBJECT_ID,
-          BUCKET_ONE_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
           BUCKET_ONE_OBJECT_ID,
           VOL_OBJECT_ID,
           KEY_ONE_SIZE,
@@ -746,7 +789,7 @@ public class TestNSSummaryEndpointWithFSO {
           VOL,
           FILE_TWO,
           KEY_TWO_OBJECT_ID,
-          DIR_TWO_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
           BUCKET_ONE_OBJECT_ID,
           VOL_OBJECT_ID,
           KEY_TWO_SIZE,
@@ -757,7 +800,7 @@ public class TestNSSummaryEndpointWithFSO {
           VOL,
           FILE_THREE,
           KEY_THREE_OBJECT_ID,
-          DIR_THREE_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
           BUCKET_ONE_OBJECT_ID,
           VOL_OBJECT_ID,
           KEY_THREE_SIZE,
@@ -768,7 +811,7 @@ public class TestNSSummaryEndpointWithFSO {
           VOL,
           FILE_FOUR,
           KEY_FOUR_OBJECT_ID,
-          BUCKET_TWO_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
           BUCKET_TWO_OBJECT_ID,
           VOL_OBJECT_ID,
           KEY_FOUR_SIZE,
@@ -779,7 +822,7 @@ public class TestNSSummaryEndpointWithFSO {
           VOL,
           FILE_FIVE,
           KEY_FIVE_OBJECT_ID,
-          BUCKET_TWO_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
           BUCKET_TWO_OBJECT_ID,
           VOL_OBJECT_ID,
           KEY_FIVE_SIZE,
@@ -790,7 +833,7 @@ public class TestNSSummaryEndpointWithFSO {
           VOL,
           FILE_SIX,
           KEY_SIX_OBJECT_ID,
-          DIR_FOUR_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
           BUCKET_ONE_OBJECT_ID,
           VOL_OBJECT_ID,
           KEY_SIX_SIZE,
@@ -801,7 +844,7 @@ public class TestNSSummaryEndpointWithFSO {
           VOL_TWO,
           FILE_EIGHT,
           KEY_EIGHT_OBJECT_ID,
-          BUCKET_THREE_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
           BUCKET_THREE_OBJECT_ID,
           VOL_TWO_OBJECT_ID,
           KEY_EIGHT_SIZE,
@@ -812,7 +855,7 @@ public class TestNSSummaryEndpointWithFSO {
           VOL_TWO,
           FILE_NINE,
           KEY_NINE_OBJECT_ID,
-          DIR_FIVE_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
           BUCKET_THREE_OBJECT_ID,
           VOL_TWO_OBJECT_ID,
           KEY_NINE_SIZE,
@@ -823,7 +866,7 @@ public class TestNSSummaryEndpointWithFSO {
           VOL_TWO,
           FILE_TEN,
           KEY_TEN_OBJECT_ID,
-          DIR_FIVE_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
           BUCKET_THREE_OBJECT_ID,
           VOL_TWO_OBJECT_ID,
           KEY_TEN_SIZE,
@@ -834,27 +877,27 @@ public class TestNSSummaryEndpointWithFSO {
           VOL_TWO,
           FILE_ELEVEN,
           KEY_ELEVEN_OBJECT_ID,
-          BUCKET_FOUR_OBJECT_ID,
+          PARENT_OBJECT_ID_ZERO,
           BUCKET_FOUR_OBJECT_ID,
           VOL_TWO_OBJECT_ID,
           KEY_ELEVEN_SIZE,
           getBucketLayout());
   }
 
-
   /**
    * Create a new OM Metadata manager instance with one user, one vol, and two
    * buckets.
    * @throws IOException ioEx
    */
   private static OMMetadataManager initializeNewOmMetadataManager(
-          File omDbDir)
-          throws IOException {
-    OzoneConfiguration omConfiguration = new OzoneConfiguration();
+      File omDbDir, OzoneConfiguration omConfiguration)
+      throws IOException {
     omConfiguration.set(OZONE_OM_DB_DIRS,
-            omDbDir.getAbsolutePath());
+        omDbDir.getAbsolutePath());
+    omConfiguration.set(OMConfigKeys
+        .OZONE_OM_ENABLE_FILESYSTEM_PATHS, "true");
     OMMetadataManager omMetadataManager = new OmMetadataManagerImpl(
-            omConfiguration);
+        omConfiguration);
 
     String volumeKey = omMetadataManager.getVolumeKey(VOL);
     OmVolumeArgs args =
@@ -912,7 +955,7 @@ public class TestNSSummaryEndpointWithFSO {
         .build();
 
     String bucketKey = omMetadataManager.getBucketKey(
-            bucketInfo.getVolumeName(), bucketInfo.getBucketName());
+        bucketInfo.getVolumeName(), bucketInfo.getBucketName());
     String bucketKey2 = omMetadataManager.getBucketKey(
         bucketInfo2.getVolumeName(), bucketInfo2.getBucketName());
     String bucketKey3 = omMetadataManager.getBucketKey(
@@ -939,7 +982,7 @@ public class TestNSSummaryEndpointWithFSO {
         VOL,
         MULTI_BLOCK_FILE,
         MULTI_BLOCK_KEY_OBJECT_ID,
-        DIR_ONE_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_ONE_OBJECT_ID,
         VOL_OBJECT_ID,
         Collections.singletonList(locationInfoGroup),
@@ -953,17 +996,17 @@ public class TestNSSummaryEndpointWithFSO {
     BlockID block3 = new BlockID(CONTAINER_THREE_ID, 0L);
 
     OmKeyLocationInfo location1 = new OmKeyLocationInfo.Builder()
-            .setBlockID(block1)
-            .setLength(BLOCK_ONE_LENGTH)
-            .build();
+        .setBlockID(block1)
+        .setLength(BLOCK_ONE_LENGTH)
+        .build();
     OmKeyLocationInfo location2 = new OmKeyLocationInfo.Builder()
-            .setBlockID(block2)
-            .setLength(BLOCK_TWO_LENGTH)
-            .build();
+        .setBlockID(block2)
+        .setLength(BLOCK_TWO_LENGTH)
+        .build();
     OmKeyLocationInfo location3 = new OmKeyLocationInfo.Builder()
-            .setBlockID(block3)
-            .setLength(BLOCK_THREE_LENGTH)
-            .build();
+        .setBlockID(block3)
+        .setLength(BLOCK_THREE_LENGTH)
+        .build();
     locationInfoList.add(location1);
     locationInfoList.add(location2);
     locationInfoList.add(location3);
@@ -1032,11 +1075,11 @@ public class TestNSSummaryEndpointWithFSO {
         VOL,
         FILE_ONE,
         KEY_ONE_OBJECT_ID,
-        BUCKET_ONE_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_ONE_OBJECT_ID,
         VOL_OBJECT_ID,
         Collections.singletonList(locationInfoGroup1),
-        getBucketLayout());
+          getBucketLayout());
 
     //vol/bucket1/dir1/dir2/file2
     writeKeyToOm(reconOMMetadataManager,
@@ -1045,11 +1088,11 @@ public class TestNSSummaryEndpointWithFSO {
         VOL,
         FILE_TWO,
         KEY_TWO_OBJECT_ID,
-        DIR_TWO_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_ONE_OBJECT_ID,
         VOL_OBJECT_ID,
         Collections.singletonList(locationInfoGroup2),
-        getBucketLayout());
+          getBucketLayout());
 
     //vol/bucket1/dir1/dir3/file3
     writeKeyToOm(reconOMMetadataManager,
@@ -1058,11 +1101,11 @@ public class TestNSSummaryEndpointWithFSO {
         VOL,
         FILE_THREE,
         KEY_THREE_OBJECT_ID,
-        DIR_THREE_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_ONE_OBJECT_ID,
         VOL_OBJECT_ID,
         Collections.singletonList(locationInfoGroup1),
-        getBucketLayout());
+          getBucketLayout());
 
     //vol/bucket2/file4
     writeKeyToOm(reconOMMetadataManager,
@@ -1071,7 +1114,7 @@ public class TestNSSummaryEndpointWithFSO {
         VOL,
         FILE_FOUR,
         KEY_FOUR_OBJECT_ID,
-        BUCKET_TWO_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_TWO_OBJECT_ID,
         VOL_OBJECT_ID,
         Collections.singletonList(locationInfoGroup2),
@@ -1084,7 +1127,7 @@ public class TestNSSummaryEndpointWithFSO {
         VOL,
         FILE_FIVE,
         KEY_FIVE_OBJECT_ID,
-        BUCKET_TWO_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_TWO_OBJECT_ID,
         VOL_OBJECT_ID,
         Collections.singletonList(locationInfoGroup1),
@@ -1097,7 +1140,7 @@ public class TestNSSummaryEndpointWithFSO {
         VOL,
         FILE_SIX,
         KEY_SIX_OBJECT_ID,
-        DIR_FOUR_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_ONE_OBJECT_ID,
         VOL_OBJECT_ID,
         Collections.singletonList(locationInfoGroup2),
@@ -1110,7 +1153,7 @@ public class TestNSSummaryEndpointWithFSO {
         VOL,
         FILE_SEVEN,
         KEY_SEVEN_OBJECT_ID,
-        DIR_ONE_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_ONE_OBJECT_ID,
         VOL_OBJECT_ID,
         Collections.singletonList(locationInfoGroup1),
@@ -1123,7 +1166,7 @@ public class TestNSSummaryEndpointWithFSO {
         VOL_TWO,
         FILE_EIGHT,
         KEY_EIGHT_OBJECT_ID,
-        BUCKET_THREE_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_THREE_OBJECT_ID,
         VOL_TWO_OBJECT_ID,
         Collections.singletonList(locationInfoGroup2),
@@ -1136,7 +1179,7 @@ public class TestNSSummaryEndpointWithFSO {
         VOL_TWO,
         FILE_NINE,
         KEY_NINE_OBJECT_ID,
-        DIR_FIVE_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_THREE_OBJECT_ID,
         VOL_TWO_OBJECT_ID,
         Collections.singletonList(locationInfoGroup1),
@@ -1149,7 +1192,7 @@ public class TestNSSummaryEndpointWithFSO {
         VOL_TWO,
         FILE_TEN,
         KEY_TEN_OBJECT_ID,
-        DIR_FIVE_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_THREE_OBJECT_ID,
         VOL_TWO_OBJECT_ID,
         Collections.singletonList(locationInfoGroup2),
@@ -1162,7 +1205,7 @@ public class TestNSSummaryEndpointWithFSO {
         VOL_TWO,
         FILE_ELEVEN,
         KEY_ELEVEN_OBJECT_ID,
-        BUCKET_FOUR_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_FOUR_OBJECT_ID,
         VOL_TWO_OBJECT_ID,
         Collections.singletonList(locationInfoGroup1),
@@ -1177,24 +1220,24 @@ public class TestNSSummaryEndpointWithFSO {
    * @return a set of container replica for testing
    */
   private static Set<ContainerReplica> generateMockContainerReplicas(
-          int replicationFactor, ContainerID containerID) {
+      int replicationFactor, ContainerID containerID) {
     Set<ContainerReplica> result = new HashSet<>();
     for (int i = 0; i < replicationFactor; ++i) {
       DatanodeDetails randomDatanode = randomDatanodeDetails();
       ContainerReplica replica = new ContainerReplica.ContainerReplicaBuilder()
-              .setContainerID(containerID)
-              .setContainerState(State.OPEN)
-              .setDatanodeDetails(randomDatanode)
-              .build();
+          .setContainerID(containerID)
+          .setContainerState(State.OPEN)
+          .setDatanodeDetails(randomDatanode)
+          .build();
       result.add(replica);
     }
     return result;
   }
 
   private static ReconStorageContainerManagerFacade getMockReconSCM()
-          throws ContainerNotFoundException {
+      throws ContainerNotFoundException {
     ReconStorageContainerManagerFacade reconSCM =
-            mock(ReconStorageContainerManagerFacade.class);
+        mock(ReconStorageContainerManagerFacade.class);
     ContainerManager containerManager = mock(ContainerManager.class);
 
     // Container 1 is 3-way replicated
@@ -1202,14 +1245,14 @@ public class TestNSSummaryEndpointWithFSO {
     Set<ContainerReplica> containerReplicas1 = generateMockContainerReplicas(
         CONTAINER_ONE_REPLICA_COUNT, containerID1);
     when(containerManager.getContainerReplicas(containerID1))
-            .thenReturn(containerReplicas1);
+        .thenReturn(containerReplicas1);
 
     // Container 2 is under replicated with 2 replica
     ContainerID containerID2 = new ContainerID(CONTAINER_TWO_ID);
     Set<ContainerReplica> containerReplicas2 = generateMockContainerReplicas(
         CONTAINER_TWO_REPLICA_COUNT, containerID2);
     when(containerManager.getContainerReplicas(containerID2))
-            .thenReturn(containerReplicas2);
+        .thenReturn(containerReplicas2);
 
     // Container 3 is over replicated with 4 replica
     ContainerID containerID3 = new ContainerID(CONTAINER_THREE_ID);
@@ -1244,6 +1287,6 @@ public class TestNSSummaryEndpointWithFSO {
   }
 
   private static BucketLayout getBucketLayout() {
-    return BucketLayout.FILE_SYSTEM_OPTIMIZED;
+    return BucketLayout.LEGACY;
   }
-}
+}
\ No newline at end of file
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java
new file mode 100644
index 0000000000..0c892bd3b3
--- /dev/null
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.recon.ReconConstants;
+import org.apache.hadoop.ozone.recon.ReconTestInjector;
+import org.apache.hadoop.ozone.recon.api.types.NSSummary;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.ClassRule;
+import org.junit.Assert;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
+import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeKeyToOm;
+import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProvider;
+import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
+
+/**
+ * Test for NSSummaryTask. Create one bucket of each layout
+ * and test process and reprocess. Currently, there is no
+ * support for OBS buckets. Check that the NSSummary
+ * for the OBS bucket is null.
+ */
+@RunWith(Enclosed.class)
+public final class TestNSSummaryTask {
+
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private static ReconNamespaceSummaryManager reconNamespaceSummaryManager;
+  private static OMMetadataManager omMetadataManager;
+  private static ReconOMMetadataManager reconOMMetadataManager;
+  private static NSSummaryTask nSSummaryTask;
+  private static OzoneConfiguration omConfiguration;
+
+  // Object names
+  private static final String VOL = "vol";
+  private static final String BUCKET_ONE = "bucket1";
+  private static final String BUCKET_TWO = "bucket2";
+  private static final String BUCKET_THREE = "bucket3";
+  private static final String KEY_ONE = "file1";
+  private static final String KEY_TWO = "file2";
+  private static final String KEY_THREE = "file3";
+  private static final String KEY_FIVE = "file5";
+  private static final String FILE_ONE = "file1";
+  private static final String FILE_TWO = "file2";
+  private static final String FILE_THREE = "file3";
+  private static final String FILE_FIVE = "file5";
+
+  private static final String TEST_USER = "TestUser";
+
+  private static final long PARENT_OBJECT_ID_ZERO = 0L;
+  private static final long VOL_OBJECT_ID = 0L;
+  private static final long BUCKET_ONE_OBJECT_ID = 1L;
+  private static final long BUCKET_TWO_OBJECT_ID = 2L;
+  private static final long BUCKET_THREE_OBJECT_ID = 4L;
+  private static final long KEY_ONE_OBJECT_ID = 3L;
+  private static final long KEY_TWO_OBJECT_ID = 5L;
+  private static final long KEY_THREE_OBJECT_ID = 8L;
+  private static final long KEY_FIVE_OBJECT_ID = 9L;
+
+  private static final long KEY_ONE_SIZE = 500L;
+  private static final long KEY_TWO_SIZE = 1025L;
+  private static final long KEY_THREE_SIZE =
+      ReconConstants.MAX_FILE_SIZE_UPPER_BOUND - 100L;
+  private static final long KEY_FIVE_SIZE = 100L;
+
+  private TestNSSummaryTask() {
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    initializeNewOmMetadataManager(TEMPORARY_FOLDER.newFolder());
+    OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+        getMockOzoneManagerServiceProvider();
+    reconOMMetadataManager = getTestReconOmMetadataManager(omMetadataManager,
+        TEMPORARY_FOLDER.newFolder());
+
+    ReconTestInjector reconTestInjector =
+        new ReconTestInjector.Builder(TEMPORARY_FOLDER)
+            .withReconOm(reconOMMetadataManager)
+            .withOmServiceProvider(ozoneManagerServiceProvider)
+            .withReconSqlDb()
+            .withContainerDB()
+            .build();
+    reconNamespaceSummaryManager =
+        reconTestInjector.getInstance(ReconNamespaceSummaryManager.class);
+
+    NSSummary nonExistentSummary =
+        reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
+    Assert.assertNull(nonExistentSummary);
+
+    populateOMDB();
+
+    nSSummaryTask = new NSSummaryTask(reconNamespaceSummaryManager,
+        reconOMMetadataManager, omConfiguration);
+  }
+
+  /**
+   * Nested class for testing NSSummaryTaskWithLegacy reprocess.
+   */
+  public static class TestReprocess {
+
+    private static NSSummary nsSummaryForBucket1;
+    private static NSSummary nsSummaryForBucket2;
+    private static NSSummary nsSummaryForBucket3;
+
+    @BeforeClass
+    public static void setUp() throws IOException {
+      // write a NSSummary prior to reprocess
+      // verify it got cleaned up after.
+      NSSummary staleNSSummary = new NSSummary();
+      RDBBatchOperation rdbBatchOperation = new RDBBatchOperation();
+      reconNamespaceSummaryManager.batchStoreNSSummaries(rdbBatchOperation, -1L,
+          staleNSSummary);
+      reconNamespaceSummaryManager.commitBatchOperation(rdbBatchOperation);
+
+      // Verify commit
+      Assert.assertNotNull(reconNamespaceSummaryManager.getNSSummary(-1L));
+
+      nSSummaryTask.reprocess(reconOMMetadataManager);
+      Assert.assertNull(reconNamespaceSummaryManager.getNSSummary(-1L));
+
+      nsSummaryForBucket1 =
+          reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
+      nsSummaryForBucket2 =
+          reconNamespaceSummaryManager.getNSSummary(BUCKET_TWO_OBJECT_ID);
+      nsSummaryForBucket3 =
+      reconNamespaceSummaryManager.getNSSummary(BUCKET_THREE_OBJECT_ID);
+      Assert.assertNotNull(nsSummaryForBucket1);
+      Assert.assertNotNull(nsSummaryForBucket2);
+      Assert.assertNull(nsSummaryForBucket3);
+    }
+
+    @Test
+    public void testReprocessNSSummaryNull() throws IOException {
+      Assert.assertNull(reconNamespaceSummaryManager.getNSSummary(-1L));
+    }
+
+    @Test
+    public void testReprocessGetFiles() {
+      Assert.assertEquals(1, nsSummaryForBucket1.getNumOfFiles());
+      Assert.assertEquals(1, nsSummaryForBucket2.getNumOfFiles());
+
+      Assert.assertEquals(KEY_ONE_SIZE, nsSummaryForBucket1.getSizeOfFiles());
+      Assert.assertEquals(KEY_TWO_SIZE, nsSummaryForBucket2.getSizeOfFiles());
+    }
+
+    @Test
+    public void testReprocessFileBucketSize() {
+      int[] fileDistBucket1 = nsSummaryForBucket1.getFileSizeBucket();
+      int[] fileDistBucket2 = nsSummaryForBucket2.getFileSizeBucket();
+      Assert.assertEquals(ReconConstants.NUM_OF_BINS, fileDistBucket1.length);
+      Assert.assertEquals(ReconConstants.NUM_OF_BINS, fileDistBucket2.length);
+
+      Assert.assertEquals(1, fileDistBucket1[0]);
+      for (int i = 1; i < ReconConstants.NUM_OF_BINS; ++i) {
+        Assert.assertEquals(0, fileDistBucket1[i]);
+      }
+      Assert.assertEquals(1, fileDistBucket2[1]);
+      for (int i = 0; i < ReconConstants.NUM_OF_BINS; ++i) {
+        if (i == 1) {
+          continue;
+        }
+        Assert.assertEquals(0, fileDistBucket2[i]);
+      }
+    }
+
+  }
+
+  /**
+   * Nested class for testing NSSummaryTaskWithLegacy process.
+   */
+  public static class TestProcess {
+
+    private static NSSummary nsSummaryForBucket1;
+    private static NSSummary nsSummaryForBucket2;
+    private static NSSummary nsSummaryForBucket3;
+
+    private static OMDBUpdateEvent keyEvent1;
+    private static OMDBUpdateEvent keyEvent2;
+
+    @BeforeClass
+    public static void setUp() throws IOException {
+      nSSummaryTask.reprocess(reconOMMetadataManager);
+      nSSummaryTask.process(processEventBatch());
+
+      nsSummaryForBucket1 =
+          reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
+      Assert.assertNotNull(nsSummaryForBucket1);
+      nsSummaryForBucket2 =
+          reconNamespaceSummaryManager.getNSSummary(BUCKET_TWO_OBJECT_ID);
+      Assert.assertNotNull(nsSummaryForBucket2);
+      nsSummaryForBucket3 =
+          reconNamespaceSummaryManager.getNSSummary(BUCKET_THREE_OBJECT_ID);
+      Assert.assertNull(nsSummaryForBucket3);
+    }
+
+    private static OMUpdateEventBatch processEventBatch() throws IOException {
+      // put file5 under bucket 2
+      String omPutKey =
+          OM_KEY_PREFIX + VOL +
+              OM_KEY_PREFIX + BUCKET_TWO +
+              OM_KEY_PREFIX + FILE_FIVE;
+      OmKeyInfo omPutKeyInfo = buildOmKeyInfo(VOL, BUCKET_TWO, KEY_FIVE,
+          FILE_FIVE, KEY_FIVE_OBJECT_ID, BUCKET_TWO_OBJECT_ID, KEY_FIVE_SIZE);
+      keyEvent1 = new OMDBUpdateEvent.
+          OMUpdateEventBuilder<String, OmKeyInfo>()
+          .setKey(omPutKey)
+          .setValue(omPutKeyInfo)
+          .setTable(omMetadataManager.getKeyTable(getLegacyBucketLayout())
+              .getName())
+          .setAction(OMDBUpdateEvent.OMDBUpdateAction.PUT)
+          .build();
+
+      // delete file 1 under bucket 1
+      String omDeleteKey = BUCKET_ONE_OBJECT_ID + OM_KEY_PREFIX + FILE_ONE;
+      OmKeyInfo omDeleteInfo = buildOmKeyInfo(
+          VOL, BUCKET_ONE, KEY_ONE, FILE_ONE,
+          KEY_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID);
+      keyEvent2 = new OMDBUpdateEvent.
+          OMUpdateEventBuilder<String, OmKeyInfo>()
+          .setKey(omDeleteKey)
+          .setValue(omDeleteInfo)
+          .setTable(omMetadataManager.getKeyTable(getFSOBucketLayout())
+              .getName())
+          .setAction(OMDBUpdateEvent.OMDBUpdateAction.DELETE)
+          .build();
+
+      OMUpdateEventBatch omUpdateEventBatch = new OMUpdateEventBatch(
+          new ArrayList<OMDBUpdateEvent>() {{
+              add(keyEvent1);
+              add(keyEvent2);
+              }});
+
+      return omUpdateEventBatch;
+    }
+
+    @Test
+    public void testProcessUpdateFileSize() throws IOException {
+      // file 1 is gone, so bucket 1 is empty now
+      Assert.assertNotNull(nsSummaryForBucket1);
+      Assert.assertEquals(0, nsSummaryForBucket1.getNumOfFiles());
+
+      Set<Long> childDirBucket1 = nsSummaryForBucket1.getChildDir();
+      Assert.assertEquals(0, childDirBucket1.size());
+    }
+
+    @Test
+    public void testProcessBucket() throws IOException {
+      // file 5 is added under bucket 2, so bucket 2 has 2 keys now
+      Assert.assertNotNull(nsSummaryForBucket2);
+      Assert.assertEquals(2, nsSummaryForBucket2.getNumOfFiles());
+      // key 2 + key 5
+      Assert.assertEquals(KEY_TWO_SIZE + KEY_FIVE_SIZE,
+          nsSummaryForBucket2.getSizeOfFiles());
+
+      int[] fileSizeDist = nsSummaryForBucket2.getFileSizeBucket();
+      Assert.assertEquals(ReconConstants.NUM_OF_BINS, fileSizeDist.length);
+      // 1025L
+      Assert.assertEquals(1, fileSizeDist[0]);
+      // 2050L
+      Assert.assertEquals(1, fileSizeDist[1]);
+      for (int i = 2; i < ReconConstants.NUM_OF_BINS; ++i) {
+        Assert.assertEquals(0, fileSizeDist[i]);
+      }
+    }
+  }
+
+  /**
+   * Build a key info for put/update action.
+   * @param volume         volume name
+   * @param bucket         bucket name
+   * @param key            key name
+   * @param fileName       file name
+   * @param objectID       object ID
+   * @param parentObjectId parent object ID
+   * @param dataSize       file size
+   * @return the KeyInfo
+   */
+  private static OmKeyInfo buildOmKeyInfo(String volume,
+                                          String bucket,
+                                          String key,
+                                          String fileName,
+                                          long objectID,
+                                          long parentObjectId,
+                                          long dataSize) {
+    return new OmKeyInfo.Builder()
+        .setBucketName(bucket)
+        .setVolumeName(volume)
+        .setKeyName(key)
+        .setFileName(fileName)
+        .setReplicationConfig(
+            StandaloneReplicationConfig.getInstance(
+                HddsProtos.ReplicationFactor.ONE))
+        .setObjectID(objectID)
+        .setParentObjectID(parentObjectId)
+        .setDataSize(dataSize)
+        .build();
+  }
+
+  /**
+   * Build a key info for delete action.
+   * @param volume         volume name
+   * @param bucket         bucket name
+   * @param key            key name
+   * @param fileName       file name
+   * @param objectID       object ID
+   * @param parentObjectId parent object ID
+   * @return the KeyInfo
+   */
+  private static OmKeyInfo buildOmKeyInfo(String volume,
+                                          String bucket,
+                                          String key,
+                                          String fileName,
+                                          long objectID,
+                                          long parentObjectId) {
+    return new OmKeyInfo.Builder()
+        .setBucketName(bucket)
+        .setVolumeName(volume)
+        .setKeyName(key)
+        .setFileName(fileName)
+        .setReplicationConfig(
+            StandaloneReplicationConfig.getInstance(
+                HddsProtos.ReplicationFactor.ONE))
+        .setObjectID(objectID)
+        .setParentObjectID(parentObjectId)
+        .build();
+  }
+
+  /**
+   * Populate OMDB with the following configs.
+   *             vol
+   *      /       \       \
+   * bucket1   bucket2    bucket3
+   *    /        /        /
+   * file1    file2     file3
+   *
+   * @throws IOException
+   */
+  private static void populateOMDB() throws IOException {
+    // Bucket1 FSO layout
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_ONE,
+        BUCKET_ONE,
+        VOL,
+        FILE_ONE,
+        KEY_ONE_OBJECT_ID,
+        BUCKET_ONE_OBJECT_ID,
+        BUCKET_ONE_OBJECT_ID,
+        VOL_OBJECT_ID,
+        KEY_ONE_SIZE,
+        getFSOBucketLayout());
+
+    // Bucket2 Legacy layout
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_TWO,
+        BUCKET_TWO,
+        VOL,
+        FILE_TWO,
+        KEY_TWO_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        BUCKET_TWO_OBJECT_ID,
+        VOL_OBJECT_ID,
+          KEY_TWO_SIZE,
+        getLegacyBucketLayout());
+
+    // Bucket3 OBS layout
+    writeKeyToOm(reconOMMetadataManager,
+        KEY_THREE,
+        BUCKET_THREE,
+        VOL,
+        FILE_THREE,
+        KEY_THREE_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        BUCKET_THREE_OBJECT_ID,
+        VOL_OBJECT_ID,
+        KEY_THREE_SIZE,
+        getOBSBucketLayout());
+  }
+
+  /**
+   * Create a new OM Metadata manager instance with one user, one vol, and two
+   * buckets. Bucket1 will have FSO layout, bucket2 will have Legacy layout
+   * and bucket3 will have OBS layout.
+   * @throws IOException ioEx
+   */
+  private static void initializeNewOmMetadataManager(
+      File omDbDir)
+      throws IOException {
+    omConfiguration = new OzoneConfiguration();
+    omConfiguration.set(OZONE_OM_DB_DIRS,
+        omDbDir.getAbsolutePath());
+    omConfiguration.set(OMConfigKeys
+        .OZONE_OM_ENABLE_FILESYSTEM_PATHS, "true");
+    omMetadataManager = new OmMetadataManagerImpl(
+        omConfiguration);
+
+    String volumeKey = omMetadataManager.getVolumeKey(VOL);
+    OmVolumeArgs args =
+        OmVolumeArgs.newBuilder()
+            .setObjectID(VOL_OBJECT_ID)
+            .setVolume(VOL)
+            .setAdminName(TEST_USER)
+            .setOwnerName(TEST_USER)
+            .build();
+    omMetadataManager.getVolumeTable().put(volumeKey, args);
+
+    OmBucketInfo bucketInfo1 = OmBucketInfo.newBuilder()
+        .setVolumeName(VOL)
+        .setBucketName(BUCKET_ONE)
+        .setObjectID(BUCKET_ONE_OBJECT_ID)
+        .setBucketLayout(getFSOBucketLayout())
+        .build();
+
+    OmBucketInfo bucketInfo2 = OmBucketInfo.newBuilder()
+        .setVolumeName(VOL)
+        .setBucketName(BUCKET_TWO)
+        .setObjectID(BUCKET_TWO_OBJECT_ID)
+        .setBucketLayout(getLegacyBucketLayout())
+        .build();
+
+    OmBucketInfo bucketInfo3 = OmBucketInfo.newBuilder()
+        .setVolumeName(VOL)
+        .setBucketName(BUCKET_THREE)
+        .setObjectID(BUCKET_THREE_OBJECT_ID)
+        .setBucketLayout(getOBSBucketLayout())
+        .build();
+
+    String bucketKey = omMetadataManager.getBucketKey(
+        bucketInfo1.getVolumeName(), bucketInfo1.getBucketName());
+    String bucketKey2 = omMetadataManager.getBucketKey(
+        bucketInfo2.getVolumeName(), bucketInfo2.getBucketName());
+    String bucketKey3 = omMetadataManager.getBucketKey(
+        bucketInfo3.getVolumeName(), bucketInfo3.getBucketName());
+
+    omMetadataManager.getBucketTable().put(bucketKey, bucketInfo1);
+    omMetadataManager.getBucketTable().put(bucketKey2, bucketInfo2);
+    omMetadataManager.getBucketTable().put(bucketKey3, bucketInfo3);
+  }
+
+  private static BucketLayout getFSOBucketLayout() {
+    return BucketLayout.FILE_SYSTEM_OPTIMIZED;
+  }
+
+  private static BucketLayout getLegacyBucketLayout() {
+    return BucketLayout.LEGACY;
+  }
+
+  private static BucketLayout getOBSBucketLayout() {
+    return BucketLayout.OBJECT_STORE;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java
index 76a05b5553..6b6b831c06 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java
@@ -134,7 +134,6 @@ public final class TestNSSummaryTaskWithFSO {
     reconNamespaceSummaryManager =
             reconTestInjector.getInstance(ReconNamespaceSummaryManager.class);
 
-
     NSSummary nonExistentSummary =
             reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
     Assert.assertNull(nonExistentSummary);
@@ -142,7 +141,7 @@ public final class TestNSSummaryTaskWithFSO {
     populateOMDB();
 
     nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(
-        reconNamespaceSummaryManager);
+        reconNamespaceSummaryManager, reconOMMetadataManager);
   }
 
   /**
@@ -165,7 +164,11 @@ public final class TestNSSummaryTaskWithFSO {
 
       // Verify commit
       Assert.assertNotNull(reconNamespaceSummaryManager.getNSSummary(-1L));
-      nSSummaryTaskWithFso.reprocess(reconOMMetadataManager);
+
+      // reinit Recon RocksDB's namespace CF.
+      reconNamespaceSummaryManager.clearNSSummaryTable();
+
+      nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
       Assert.assertNull(reconNamespaceSummaryManager.getNSSummary(-1L));
 
       nsSummaryForBucket1 =
@@ -273,8 +276,8 @@ public final class TestNSSummaryTaskWithFSO {
     private static OMDBUpdateEvent keyEvent7;
     @BeforeClass
     public static void setUp() throws IOException {
-      nSSummaryTaskWithFso.reprocess(reconOMMetadataManager);
-      nSSummaryTaskWithFso.process(processEventBatch());
+      nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
+      nSSummaryTaskWithFso.processWithFSO(processEventBatch());
     }
 
     private static OMUpdateEventBatch processEventBatch() throws IOException {
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java
similarity index 68%
copy from hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java
copy to hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java
index 76a05b5553..332d88238a 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java
@@ -19,18 +19,22 @@
 package org.apache.hadoop.ozone.recon.tasks;
 
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
-import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.recon.ReconConstants;
 import org.apache.hadoop.ozone.recon.ReconTestInjector;
 import org.apache.hadoop.ozone.recon.api.types.NSSummary;
 import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
-import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.ClassRule;
@@ -39,23 +43,24 @@ import org.junit.experimental.runners.Enclosed;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
-import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProviderWithFSO;
-import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
-import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
-import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDirToOm;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
 import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeKeyToOm;
+import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDirToOm;
+import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProvider;
+import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
 
 /**
- * Test for NSSummaryTaskWithFSO.
+ * Test for NSSummaryTaskWithLegacy.
  */
 @RunWith(Enclosed.class)
-public final class TestNSSummaryTaskWithFSO {
+public final class TestNSSummaryTaskWithLegacy {
 
   @ClassRule
   public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@@ -63,9 +68,10 @@ public final class TestNSSummaryTaskWithFSO {
   private static ReconNamespaceSummaryManager reconNamespaceSummaryManager;
   private static OMMetadataManager omMetadataManager;
   private static ReconOMMetadataManager reconOMMetadataManager;
-  private static NSSummaryTaskWithFSO nSSummaryTaskWithFso;
+  private static NSSummaryTaskWithLegacy nSSummaryTaskWithLegacy;
+  private static OzoneConfiguration omConfiguration;
 
-  // Object names in FSO-enabled format
+  // Object names
   private static final String VOL = "vol";
   private static final String BUCKET_ONE = "bucket1";
   private static final String BUCKET_TWO = "bucket2";
@@ -86,6 +92,9 @@ public final class TestNSSummaryTaskWithFSO {
   private static final String DIR_FOUR = "dir4";
   private static final String DIR_FIVE = "dir5";
 
+  private static final String TEST_USER = "TestUser";
+
+  private static final long PARENT_OBJECT_ID_ZERO = 0L;
   private static final long VOL_OBJECT_ID = 0L;
   private static final long BUCKET_ONE_OBJECT_ID = 1L;
   private static final long BUCKET_TWO_OBJECT_ID = 2L;
@@ -104,7 +113,7 @@ public final class TestNSSummaryTaskWithFSO {
   private static final long KEY_TWO_OLD_SIZE = 1025L;
   private static final long KEY_TWO_UPDATE_SIZE = 1023L;
   private static final long KEY_THREE_SIZE =
-          ReconConstants.MAX_FILE_SIZE_UPPER_BOUND - 100L;
+      ReconConstants.MAX_FILE_SIZE_UPPER_BOUND - 100L;
   private static final long KEY_FOUR_SIZE = 2050L;
   private static final long KEY_FIVE_SIZE = 100L;
 
@@ -112,17 +121,16 @@ public final class TestNSSummaryTaskWithFSO {
   private static Set<Long> bucketTwoAns = new HashSet<>();
   private static Set<Long> dirOneAns = new HashSet<>();
 
-  private TestNSSummaryTaskWithFSO() {
+  private TestNSSummaryTaskWithLegacy() {
   }
 
   @BeforeClass
   public static void setUp() throws Exception {
-    omMetadataManager = initializeNewOmMetadataManager(
-            TEMPORARY_FOLDER.newFolder());
-    OzoneManagerServiceProvider ozoneManagerServiceProvider =
-        getMockOzoneManagerServiceProviderWithFSO();
+    initializeNewOmMetadataManager(TEMPORARY_FOLDER.newFolder());
+    OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+        getMockOzoneManagerServiceProvider();
     reconOMMetadataManager = getTestReconOmMetadataManager(omMetadataManager,
-            TEMPORARY_FOLDER.newFolder());
+        TEMPORARY_FOLDER.newFolder());
 
     ReconTestInjector reconTestInjector =
         new ReconTestInjector.Builder(TEMPORARY_FOLDER)
@@ -132,21 +140,21 @@ public final class TestNSSummaryTaskWithFSO {
             .withContainerDB()
             .build();
     reconNamespaceSummaryManager =
-            reconTestInjector.getInstance(ReconNamespaceSummaryManager.class);
-
+        reconTestInjector.getInstance(ReconNamespaceSummaryManager.class);
 
     NSSummary nonExistentSummary =
-            reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
+        reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
     Assert.assertNull(nonExistentSummary);
 
     populateOMDB();
 
-    nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(
-        reconNamespaceSummaryManager);
+    nSSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(
+        reconNamespaceSummaryManager,
+        reconOMMetadataManager, omConfiguration);
   }
 
   /**
-   * Nested class for testing NSSummaryTaskWithFSO reprocess.
+   * Nested class for testing NSSummaryTaskWithLegacy reprocess.
    */
   public static class TestReprocess {
 
@@ -165,7 +173,11 @@ public final class TestNSSummaryTaskWithFSO {
 
       // Verify commit
       Assert.assertNotNull(reconNamespaceSummaryManager.getNSSummary(-1L));
-      nSSummaryTaskWithFso.reprocess(reconOMMetadataManager);
+
+      // reinit Recon RocksDB's namespace CF.
+      reconNamespaceSummaryManager.clearNSSummaryTable();
+
+      nSSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
       Assert.assertNull(reconNamespaceSummaryManager.getNSSummary(-1L));
 
       nsSummaryForBucket1 =
@@ -189,7 +201,7 @@ public final class TestNSSummaryTaskWithFSO {
       Assert.assertEquals(KEY_ONE_SIZE, nsSummaryForBucket1.getSizeOfFiles());
       Assert.assertEquals(KEY_TWO_OLD_SIZE + KEY_FOUR_SIZE,
           nsSummaryForBucket2.getSizeOfFiles());
-    } 
+    }
 
     @Test
     public void testReprocessFileBucketSize() {
@@ -256,14 +268,19 @@ public final class TestNSSummaryTaskWithFSO {
       Assert.assertEquals(0, nsSummaryForBucket2.getDirName().length());
       // check dirName is correctly written
       Assert.assertEquals(DIR_ONE, nsSummaryInDir1.getDirName());
-      Assert.assertEquals(DIR_TWO, nsSummaryInDir2.getDirName());
+      Assert.assertEquals(DIR_ONE + OM_KEY_PREFIX + DIR_TWO,
+          nsSummaryInDir2.getDirName());
     }
   }
 
   /**
-   * Nested class for testing NSSummaryTaskWithFSO process.
+   * Nested class for testing NSSummaryTaskWithLegacy process.
    */
   public static class TestProcess {
+
+    private static NSSummary nsSummaryForBucket1;
+    private static NSSummary nsSummaryForBucket2;
+
     private static OMDBUpdateEvent keyEvent1;
     private static OMDBUpdateEvent keyEvent2;
     private static OMDBUpdateEvent keyEvent3;
@@ -271,16 +288,26 @@ public final class TestNSSummaryTaskWithFSO {
     private static OMDBUpdateEvent keyEvent5;
     private static OMDBUpdateEvent keyEvent6;
     private static OMDBUpdateEvent keyEvent7;
+
     @BeforeClass
     public static void setUp() throws IOException {
-      nSSummaryTaskWithFso.reprocess(reconOMMetadataManager);
-      nSSummaryTaskWithFso.process(processEventBatch());
+      nSSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
+      nSSummaryTaskWithLegacy.processWithLegacy(processEventBatch());
+
+      nsSummaryForBucket1 =
+          reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
+      Assert.assertNotNull(nsSummaryForBucket1);
+      nsSummaryForBucket2 =
+          reconNamespaceSummaryManager.getNSSummary(BUCKET_TWO_OBJECT_ID);
+      Assert.assertNotNull(nsSummaryForBucket2);
     }
 
     private static OMUpdateEventBatch processEventBatch() throws IOException {
-      // Events for keyTable change:
       // put file5 under bucket 2
-      String omPutKey = BUCKET_TWO_OBJECT_ID + OM_KEY_PREFIX + FILE_FIVE;
+      String omPutKey =
+          OM_KEY_PREFIX + VOL +
+              OM_KEY_PREFIX + BUCKET_TWO +
+              OM_KEY_PREFIX + FILE_FIVE;
       OmKeyInfo omPutKeyInfo = buildOmKeyInfo(VOL, BUCKET_TWO, KEY_FIVE,
           FILE_FIVE, KEY_FIVE_OBJECT_ID, BUCKET_TWO_OBJECT_ID, KEY_FIVE_SIZE);
       keyEvent1 = new OMDBUpdateEvent.
@@ -293,10 +320,13 @@ public final class TestNSSummaryTaskWithFSO {
           .build();
 
       // delete file 1 under bucket 1
-      String omDeleteKey = BUCKET_ONE_OBJECT_ID + OM_KEY_PREFIX + FILE_ONE;
+      String omDeleteKey =
+          OM_KEY_PREFIX + VOL +
+              OM_KEY_PREFIX + BUCKET_ONE +
+              OM_KEY_PREFIX + FILE_ONE;
       OmKeyInfo omDeleteInfo = buildOmKeyInfo(
-          VOL, BUCKET_ONE, KEY_ONE, FILE_ONE,
-          KEY_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID);
+          VOL, BUCKET_ONE, KEY_ONE,
+          FILE_ONE, KEY_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID);
       keyEvent2 = new OMDBUpdateEvent.
           OMUpdateEventBuilder<String, OmKeyInfo>()
           .setKey(omDeleteKey)
@@ -307,7 +337,10 @@ public final class TestNSSummaryTaskWithFSO {
           .build();
 
       // update file 2's size under bucket 2
-      String omUpdateKey = BUCKET_TWO_OBJECT_ID + OM_KEY_PREFIX + FILE_TWO;
+      String omUpdateKey =
+          OM_KEY_PREFIX + VOL +
+              OM_KEY_PREFIX + BUCKET_TWO +
+              OM_KEY_PREFIX + FILE_TWO;
       OmKeyInfo omOldInfo = buildOmKeyInfo(
           VOL, BUCKET_TWO, KEY_TWO, FILE_TWO,
           KEY_TWO_OBJECT_ID, BUCKET_TWO_OBJECT_ID, KEY_TWO_OLD_SIZE);
@@ -324,60 +357,77 @@ public final class TestNSSummaryTaskWithFSO {
           .setAction(OMDBUpdateEvent.OMDBUpdateAction.UPDATE)
           .build();
 
-      // Events for DirectoryTable change:
       // add dir 4 under bucket 1
-      String omDirPutKey1 = BUCKET_ONE_OBJECT_ID + OM_KEY_PREFIX + DIR_FOUR;
-      OmDirectoryInfo omDirPutValue1 = buildOmDirInfo(DIR_FOUR,
-          DIR_FOUR_OBJECT_ID, BUCKET_ONE_OBJECT_ID);
+      String omDirPutKey1 =
+          OM_KEY_PREFIX + VOL +
+              OM_KEY_PREFIX + BUCKET_ONE +
+              OM_KEY_PREFIX + DIR_FOUR + OM_KEY_PREFIX;
+      OmKeyInfo omDirPutValue1 = buildOmDirKeyInfo(VOL, BUCKET_ONE,
+          (DIR_FOUR + OM_KEY_PREFIX), DIR_FOUR,
+          DIR_FOUR_OBJECT_ID);
       keyEvent4 = new OMDBUpdateEvent.
-          OMUpdateEventBuilder<String, OmDirectoryInfo>()
+          OMUpdateEventBuilder<String, OmKeyInfo>()
           .setKey(omDirPutKey1)
           .setValue(omDirPutValue1)
           .setAction(OMDBUpdateEvent.OMDBUpdateAction.PUT)
-          .setTable(omMetadataManager.getDirectoryTable().getName())
+          .setTable(omMetadataManager.getKeyTable(getBucketLayout()).getName())
           .build();
 
       // add dir 5 under bucket 2
-      String omDirPutKey2 = BUCKET_TWO_OBJECT_ID + OM_KEY_PREFIX + DIR_FIVE;
-      OmDirectoryInfo omDirPutValue2 = buildOmDirInfo(DIR_FIVE,
-          DIR_FIVE_OBJECT_ID, BUCKET_TWO_OBJECT_ID);
+      String omDirPutKey2 =
+          OM_KEY_PREFIX + VOL +
+              OM_KEY_PREFIX + BUCKET_TWO +
+              OM_KEY_PREFIX + DIR_FIVE + OM_KEY_PREFIX;
+      OmKeyInfo omDirPutValue2 = buildOmDirKeyInfo(VOL, BUCKET_TWO,
+          (DIR_FIVE + OM_KEY_PREFIX), DIR_FIVE,
+          DIR_FIVE_OBJECT_ID);
       keyEvent5 = new OMDBUpdateEvent.
-          OMUpdateEventBuilder<String, OmDirectoryInfo>()
+          OMUpdateEventBuilder<String, OmKeyInfo>()
           .setKey(omDirPutKey2)
           .setValue(omDirPutValue2)
           .setAction(OMDBUpdateEvent.OMDBUpdateAction.PUT)
-          .setTable(omMetadataManager.getDirectoryTable().getName())
+          .setTable(omMetadataManager.getKeyTable(getBucketLayout()).getName())
           .build();
 
       // delete dir 3 under dir 1
-      String omDirDeleteKey = DIR_ONE_OBJECT_ID + OM_KEY_PREFIX + DIR_THREE;
-      OmDirectoryInfo omDirDeleteValue = buildOmDirInfo(DIR_THREE,
-          DIR_THREE_OBJECT_ID, DIR_ONE_OBJECT_ID);
+      String omDirDeleteKey =
+          OM_KEY_PREFIX + VOL +
+              OM_KEY_PREFIX + BUCKET_ONE +
+              OM_KEY_PREFIX + DIR_ONE +
+              OM_KEY_PREFIX + DIR_THREE + OM_KEY_PREFIX;
+      OmKeyInfo omDirDeleteValue = buildOmKeyInfo(VOL, BUCKET_ONE,
+          (DIR_ONE + OM_KEY_PREFIX + DIR_THREE + OM_KEY_PREFIX),
+          DIR_THREE, DIR_THREE_OBJECT_ID, DIR_ONE_OBJECT_ID);
       keyEvent6 = new OMDBUpdateEvent.
-          OMUpdateEventBuilder<String, OmDirectoryInfo>()
+          OMUpdateEventBuilder<String, OmKeyInfo>()
           .setKey(omDirDeleteKey)
           .setValue(omDirDeleteValue)
           .setAction(OMDBUpdateEvent.OMDBUpdateAction.DELETE)
-          .setTable(omMetadataManager.getDirectoryTable().getName())
+          .setTable(omMetadataManager.getKeyTable(getBucketLayout()).getName())
           .build();
 
       // rename dir1
-      String omDirUpdateKey = BUCKET_ONE_OBJECT_ID + OM_KEY_PREFIX + DIR_ONE;
-      OmDirectoryInfo omDirOldValue = buildOmDirInfo(DIR_ONE,
-          DIR_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID);
-      OmDirectoryInfo omDirUpdateValue = buildOmDirInfo(DIR_ONE_RENAME,
-          DIR_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID);
+      String omDirUpdateKey =
+          OM_KEY_PREFIX + VOL +
+              OM_KEY_PREFIX + BUCKET_ONE +
+              OM_KEY_PREFIX + DIR_ONE + OM_KEY_PREFIX;
+      OmKeyInfo omDirOldValue = buildOmDirKeyInfo(VOL, BUCKET_ONE,
+          (DIR_ONE + OM_KEY_PREFIX), DIR_ONE,
+          DIR_ONE_OBJECT_ID);
+      OmKeyInfo omDirUpdateValue = buildOmDirKeyInfo(VOL, BUCKET_ONE,
+          (DIR_ONE_RENAME + OM_KEY_PREFIX), DIR_ONE_RENAME,
+          DIR_ONE_OBJECT_ID);
       keyEvent7 = new OMDBUpdateEvent.
-          OMUpdateEventBuilder<String, OmDirectoryInfo>()
+          OMUpdateEventBuilder<String, OmKeyInfo>()
           .setKey(omDirUpdateKey)
           .setValue(omDirUpdateValue)
           .setOldValue(omDirOldValue)
           .setAction(OMDBUpdateEvent.OMDBUpdateAction.UPDATE)
-          .setTable(omMetadataManager.getDirectoryTable().getName())
+          .setTable(omMetadataManager.getKeyTable(getBucketLayout()).getName())
           .build();
 
       OMUpdateEventBatch omUpdateEventBatch = new OMUpdateEventBatch(
-              new ArrayList<OMDBUpdateEvent>() {{
+          new ArrayList<OMDBUpdateEvent>() {{
               add(keyEvent1);
               add(keyEvent2);
               add(keyEvent3);
@@ -392,8 +442,6 @@ public final class TestNSSummaryTaskWithFSO {
 
     @Test
     public void testProcessUpdateFileSize() throws IOException {
-      NSSummary nsSummaryForBucket1 =
-          reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
       // file 1 is gone, so bucket 1 is empty now
       Assert.assertNotNull(nsSummaryForBucket1);
       Assert.assertEquals(0, nsSummaryForBucket1.getNumOfFiles());
@@ -409,8 +457,6 @@ public final class TestNSSummaryTaskWithFSO {
 
     @Test
     public void testProcessBucket() throws IOException {
-      NSSummary nsSummaryForBucket2 =
-          reconNamespaceSummaryManager.getNSSummary(BUCKET_TWO_OBJECT_ID);
       // file 5 is added under bucket 2, so bucket 2 has 3 keys now
       // file 2 is updated with new datasize,
       // so file size dist for bucket 2 should be updated
@@ -506,26 +552,43 @@ public final class TestNSSummaryTaskWithFSO {
                                           long objectID,
                                           long parentObjectId) {
     return new OmKeyInfo.Builder()
-            .setBucketName(bucket)
-            .setVolumeName(volume)
-            .setKeyName(key)
-            .setFileName(fileName)
-            .setReplicationConfig(
-                    StandaloneReplicationConfig.getInstance(
-                            HddsProtos.ReplicationFactor.ONE))
-            .setObjectID(objectID)
-            .setParentObjectID(parentObjectId)
-            .build();
+        .setBucketName(bucket)
+        .setVolumeName(volume)
+        .setKeyName(key)
+        .setFileName(fileName)
+        .setReplicationConfig(
+            StandaloneReplicationConfig.getInstance(
+                HddsProtos.ReplicationFactor.ONE))
+        .setObjectID(objectID)
+        .setParentObjectID(parentObjectId)
+        .build();
   }
 
-  private static OmDirectoryInfo buildOmDirInfo(String dirName,
-                                                long objectId,
-                                                long parentObjectId) {
-    return new OmDirectoryInfo.Builder()
-            .setName(dirName)
-            .setObjectID(objectId)
-            .setParentObjectID(parentObjectId)
-            .build();
+  /**
+   * Build a directory as key info for put/update action.
+   * We don't need to set size.
+   * @param volume volume name
+   * @param bucket bucket name
+   * @param key key name
+   * @param fileName file name
+   * @param objectID object ID
+   * @return the KeyInfo
+   */
+  private static OmKeyInfo buildOmDirKeyInfo(String volume,
+                                             String bucket,
+                                             String key,
+                                             String fileName,
+                                             long objectID) {
+    return new OmKeyInfo.Builder()
+        .setBucketName(bucket)
+        .setVolumeName(volume)
+        .setKeyName(key)
+        .setFileName(fileName)
+        .setReplicationConfig(
+            StandaloneReplicationConfig.getInstance(
+                HddsProtos.ReplicationFactor.ONE))
+        .setObjectID(objectID)
+        .build();
   }
 
   /**
@@ -549,7 +612,7 @@ public final class TestNSSummaryTaskWithFSO {
         VOL,
         FILE_ONE,
         KEY_ONE_OBJECT_ID,
-        BUCKET_ONE_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_ONE_OBJECT_ID,
         VOL_OBJECT_ID,
         KEY_ONE_SIZE,
@@ -560,7 +623,7 @@ public final class TestNSSummaryTaskWithFSO {
         VOL,
         FILE_TWO,
         KEY_TWO_OBJECT_ID,
-        BUCKET_TWO_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_TWO_OBJECT_ID,
         VOL_OBJECT_ID,
         KEY_TWO_OLD_SIZE,
@@ -571,7 +634,7 @@ public final class TestNSSummaryTaskWithFSO {
         VOL,
         FILE_THREE,
         KEY_THREE_OBJECT_ID,
-        DIR_TWO_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_ONE_OBJECT_ID,
         VOL_OBJECT_ID,
         KEY_THREE_SIZE,
@@ -582,23 +645,96 @@ public final class TestNSSummaryTaskWithFSO {
         VOL,
         FILE_FOUR,
         KEY_FOUR_OBJECT_ID,
-        BUCKET_TWO_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
         BUCKET_TWO_OBJECT_ID,
         VOL_OBJECT_ID,
         KEY_FOUR_SIZE,
         getBucketLayout());
-    writeDirToOm(reconOMMetadataManager, DIR_ONE_OBJECT_ID,
-            BUCKET_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
-            VOL_OBJECT_ID, DIR_ONE);
-    writeDirToOm(reconOMMetadataManager, DIR_TWO_OBJECT_ID,
-            DIR_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
-            VOL_OBJECT_ID, DIR_TWO);
-    writeDirToOm(reconOMMetadataManager, DIR_THREE_OBJECT_ID,
-            DIR_ONE_OBJECT_ID, BUCKET_ONE_OBJECT_ID,
-            VOL_OBJECT_ID, DIR_THREE);
+
+    writeDirToOm(reconOMMetadataManager,
+        (DIR_ONE + OM_KEY_PREFIX),
+        BUCKET_ONE,
+        VOL,
+        DIR_ONE,
+        DIR_ONE_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        BUCKET_ONE_OBJECT_ID,
+        VOL_OBJECT_ID,
+        getBucketLayout());
+    writeDirToOm(reconOMMetadataManager,
+        (DIR_ONE + OM_KEY_PREFIX +
+            DIR_TWO + OM_KEY_PREFIX),
+        BUCKET_ONE,
+        VOL,
+        DIR_TWO,
+        DIR_TWO_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        BUCKET_ONE_OBJECT_ID,
+        VOL_OBJECT_ID,
+        getBucketLayout());
+    writeDirToOm(reconOMMetadataManager,
+        (DIR_ONE + OM_KEY_PREFIX +
+            DIR_THREE + OM_KEY_PREFIX),
+        BUCKET_ONE,
+        VOL,
+        DIR_THREE,
+        DIR_THREE_OBJECT_ID,
+        PARENT_OBJECT_ID_ZERO,
+        BUCKET_ONE_OBJECT_ID,
+        VOL_OBJECT_ID,
+        getBucketLayout());
+  }
+
+  /**
+   * Create a new OM Metadata manager instance with one user, one vol, and two
+   * buckets.
+   * @throws IOException ioEx
+   */
+  private static void initializeNewOmMetadataManager(
+      File omDbDir)
+      throws IOException {
+    omConfiguration = new OzoneConfiguration();
+    omConfiguration.set(OZONE_OM_DB_DIRS,
+        omDbDir.getAbsolutePath());
+    omConfiguration.set(OMConfigKeys
+        .OZONE_OM_ENABLE_FILESYSTEM_PATHS, "true");
+    omMetadataManager = new OmMetadataManagerImpl(
+        omConfiguration);
+
+    String volumeKey = omMetadataManager.getVolumeKey(VOL);
+    OmVolumeArgs args =
+        OmVolumeArgs.newBuilder()
+            .setObjectID(VOL_OBJECT_ID)
+            .setVolume(VOL)
+            .setAdminName(TEST_USER)
+            .setOwnerName(TEST_USER)
+            .build();
+    omMetadataManager.getVolumeTable().put(volumeKey, args);
+
+    OmBucketInfo bucketInfo1 = OmBucketInfo.newBuilder()
+        .setVolumeName(VOL)
+        .setBucketName(BUCKET_ONE)
+        .setObjectID(BUCKET_ONE_OBJECT_ID)
+        .setBucketLayout(getBucketLayout())
+        .build();
+
+    OmBucketInfo bucketInfo2 = OmBucketInfo.newBuilder()
+        .setVolumeName(VOL)
+        .setBucketName(BUCKET_TWO)
+        .setObjectID(BUCKET_TWO_OBJECT_ID)
+        .setBucketLayout(getBucketLayout())
+        .build();
+
+    String bucketKey = omMetadataManager.getBucketKey(
+        bucketInfo1.getVolumeName(), bucketInfo1.getBucketName());
+    String bucketKey2 = omMetadataManager.getBucketKey(
+        bucketInfo2.getVolumeName(), bucketInfo2.getBucketName());
+
+    omMetadataManager.getBucketTable().put(bucketKey, bucketInfo1);
+    omMetadataManager.getBucketTable().put(bucketKey2, bucketInfo2);
   }
 
   private static BucketLayout getBucketLayout() {
-    return BucketLayout.FILE_SYSTEM_OPTIMIZED;
+    return BucketLayout.LEGACY;
   }
-}
+}
\ No newline at end of file
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/DiskUsageSubCommand.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/DiskUsageSubCommand.java
index 3a4a16d54d..5934889432 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/DiskUsageSubCommand.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/DiskUsageSubCommand.java
@@ -32,7 +32,7 @@ import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.getRespo
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.makeHttpCall;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.parseInputPath;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printEmptyPathRequest;
-import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printFSOReminder;
+import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printBucketReminder;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printKVSeparator;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printNewLines;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printPathNotFound;
@@ -106,8 +106,9 @@ public class DiskUsageSubCommand implements Callable {
     if (duResponse.get("status").equals("PATH_NOT_FOUND")) {
       printPathNotFound();
     } else {
-      if (!parent.isFileSystemOptimizedBucket(path)) {
-        printFSOReminder();
+      if (parent.isObjectStoreBucket(path) ||
+          !parent.bucketIsPresentInThePath(path)) {
+        printBucketReminder();
       }
 
       long totalSize = (long)(double)duResponse.get("size");
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/FileSizeDistSubCommand.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/FileSizeDistSubCommand.java
index 9f02121c8f..5a2a2d11c0 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/FileSizeDistSubCommand.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/FileSizeDistSubCommand.java
@@ -28,7 +28,7 @@ import java.util.concurrent.Callable;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.getResponseMap;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.makeHttpCall;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printEmptyPathRequest;
-import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printFSOReminder;
+import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printBucketReminder;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printNewLines;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printPathNotFound;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printSpaces;
@@ -80,8 +80,9 @@ public class FileSizeDistSubCommand implements Callable {
     } else if (distResponse.get("status").equals("TYPE_NOT_APPLICABLE")) {
       printTypeNA("File Size Distribution");
     } else {
-      if (!parent.isFileSystemOptimizedBucket(path)) {
-        printFSOReminder();
+      if (parent.isObjectStoreBucket(path) ||
+          !parent.bucketIsPresentInThePath(path)) {
+        printBucketReminder();
       }
 
       printWithUnderline("File Size Distribution", true);
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/NSSummaryAdmin.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/NSSummaryAdmin.java
index 220365883d..727be27670 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/NSSummaryAdmin.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/NSSummaryAdmin.java
@@ -30,12 +30,14 @@ import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.kohsuke.MetaInfServices;
 import picocli.CommandLine;
 
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.Objects;
 
 import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_DEFAULT;
 import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY;
@@ -108,6 +110,61 @@ public class NSSummaryAdmin extends GenericCli implements SubcommandWithParent {
     }
   }
 
+  public boolean isObjectStoreBucket(String path) throws IOException {
+    OFSPath ofsPath = new OFSPath(path);
+
+    boolean enableFileSystemPaths = getOzoneConfig()
+        .getBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
+            OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS_DEFAULT);
+
+    OzoneClient ozoneClient = OzoneClientFactory.getRpcClient(getOzoneConfig());
+    ObjectStore objectStore = ozoneClient.getObjectStore();
+
+    try {
+      OzoneBucket bucket = objectStore.getVolume(ofsPath.getVolumeName())
+          .getBucket(ofsPath.getBucketName());
+
+      // Resolve the bucket layout in case this is a Link Bucket.
+      BucketLayout resolvedBucketLayout =
+          OzoneClientUtils.resolveLinkBucketLayout(bucket, objectStore,
+              new HashSet<>());
+
+      return resolvedBucketLayout.isObjectStore(enableFileSystemPaths);
+    } catch (IOException e) {
+      System.out.println(
+          "Bucket layout couldn't be verified for path: " + ofsPath +
+              ". Exception: " + e);
+      return false;
+    }
+  }
+
+  /**
+   * Checking if the bucket is part of the path.
+   * Return false if path is root, just a volume or invalid.
+   * @param path
+   * @return true if the bucket
+   * is not part of the given path.
+   * @throws IOException
+   */
+  public boolean bucketIsPresentInThePath(String path) throws IOException {
+    OFSPath ofsPath = new OFSPath(path);
+
+    OzoneClient ozoneClient = OzoneClientFactory.getRpcClient(getOzoneConfig());
+    ObjectStore objectStore = ozoneClient.getObjectStore();
+
+    try {
+      OzoneBucket bucket = objectStore.getVolume(ofsPath.getVolumeName())
+          .getBucket(ofsPath.getBucketName());
+
+      return Objects.nonNull(bucket);
+    } catch (IOException e) {
+      System.out.println(
+          "Bucket layout couldn't be verified for path: " + ofsPath +
+              ". Exception: " + e);
+      return false;
+    }
+  }
+
   /**
    * e.g. Input: "0.0.0.0:9891" -> Output: "0.0.0.0"
    */
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/NSSummaryCLIUtils.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/NSSummaryCLIUtils.java
index 9c56924af2..729aa20c5c 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/NSSummaryCLIUtils.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/NSSummaryCLIUtils.java
@@ -152,12 +152,13 @@ public final class NSSummaryCLIUtils {
     }
   }
 
-  public static void printFSOReminder() {
+  public static void printBucketReminder() {
     printNewLines(1);
     System.out.println(
-        "[Warning] Namespace CLI is only designed for FSO mode.\n" +
-            "Bucket being accessed must be of type FILE_SYSTEM_OPTIMIZED" +
-            " bucket layout.");
+        "[Warning] Namespace CLI is not designed for OBS bucket layout.\n" +
+            "Bucket being accessed must be of type FILE_SYSTEM_OPTIMIZED " +
+            "bucket layout or \nLEGACY bucket layout with " +
+            "'ozone.om.enable.filesystem.paths' set to true.");
     printNewLines(1);
   }
 
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/QuotaUsageSubCommand.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/QuotaUsageSubCommand.java
index 88a7b2a554..c3494cf4ff 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/QuotaUsageSubCommand.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/QuotaUsageSubCommand.java
@@ -27,7 +27,7 @@ import java.util.concurrent.Callable;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.getResponseMap;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.makeHttpCall;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printEmptyPathRequest;
-import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printFSOReminder;
+import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printBucketReminder;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printKVSeparator;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printNewLines;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printPathNotFound;
@@ -80,8 +80,9 @@ public class QuotaUsageSubCommand implements Callable {
     } else if (quotaResponse.get("status").equals("TYPE_NOT_APPLICABLE")) {
       printTypeNA("Quota");
     } else {
-      if (!parent.isFileSystemOptimizedBucket(path)) {
-        printFSOReminder();
+      if (parent.isObjectStoreBucket(path) ||
+          !parent.bucketIsPresentInThePath(path)) {
+        printBucketReminder();
       }
 
       printWithUnderline("Quota", true);
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/SummarySubCommand.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/SummarySubCommand.java
index c0d2ed7f0c..4a4946bb80 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/SummarySubCommand.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/nssummary/SummarySubCommand.java
@@ -27,7 +27,7 @@ import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.getRespo
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.makeHttpCall;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.parseInputPath;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printEmptyPathRequest;
-import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printFSOReminder;
+import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printBucketReminder;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printKVSeparator;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printNewLines;
 import static org.apache.hadoop.ozone.admin.nssummary.NSSummaryCLIUtils.printPathNotFound;
@@ -76,8 +76,9 @@ public class SummarySubCommand implements Callable<Void> {
     if (summaryResponse.get("status").equals("PATH_NOT_FOUND")) {
       printPathNotFound();
     } else {
-      if (!parent.isFileSystemOptimizedBucket(path)) {
-        printFSOReminder();
+      if (parent.isObjectStoreBucket(path) ||
+          !parent.bucketIsPresentInThePath(path)) {
+        printBucketReminder();
       }
 
       printWithUnderline("Entity Type", false);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org