You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ra...@apache.org on 2020/11/20 10:15:40 UTC

[ozone] 03/04: HDDS-4332: ListFileStatus - do lookup in directory and file tables (#1503)

This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch HDDS-2939
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 8bd8e7a1523594ec23b4c76b6bc7409067a46156
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Sat Oct 31 06:32:23 2020 +0530

    HDDS-4332: ListFileStatus - do lookup in directory and file tables (#1503)
---
 .../hadoop/ozone/om/helpers/OzoneFSUtils.java      |  31 ++
 .../hadoop/fs/ozone/TestOzoneFileInterfaces.java   |  18 +-
 .../hadoop/fs/ozone/TestOzoneFileInterfacesV1.java |  66 ++++
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |  78 ++--
 .../hadoop/fs/ozone/TestOzoneFileSystemV1.java     | 380 ++++++++++++++++++++
 .../ozone/freon/TestHadoopDirTreeGenerator.java    |  22 +-
 .../ozone/freon/TestHadoopDirTreeGeneratorV1.java  |  33 ++
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 398 +++++++++++++++++++++
 .../request/file/OMDirectoryCreateRequestV1.java   |   2 -
 .../ozone/om/request/file/OMFileRequest.java       | 117 +++++-
 10 files changed, 1116 insertions(+), 29 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFSUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFSUtils.java
index 96df56f..63bfd8f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFSUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFSUtils.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.util.StringUtils;
 import javax.annotation.Nonnull;
 import java.nio.file.Paths;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 
 /**
@@ -131,4 +132,34 @@ public final class OzoneFSUtils {
     // failed to converts a path key
     return keyName;
   }
+
+  /**
+   * Verifies whether the childKey is an immediate path under the given
+   * parentKey.
+   *
+   * @param parentKey parent key name
+   * @param childKey  child key name
+   * @return true if childKey is an immediate path under the given parentKey
+   */
+  public static boolean isImmediateChild(String parentKey, String childKey) {
+
+    // Empty childKey has no parent, so just returning false.
+    if (org.apache.commons.lang3.StringUtils.isBlank(childKey)) {
+      return false;
+    }
+    java.nio.file.Path parentPath = Paths.get(parentKey);
+    java.nio.file.Path childPath = Paths.get(childKey);
+
+    java.nio.file.Path childParent = childPath.getParent();
+    // Following are the valid parentKey formats:
+    // parentKey="" or parentKey="/" or parentKey="/a" or parentKey="a"
+    // Following are the valid childKey formats:
+    // childKey="/" or childKey="/a/b" or childKey="a/b"
+    if (org.apache.commons.lang3.StringUtils.isBlank(parentKey)) {
+      return childParent == null ||
+              OM_KEY_PREFIX.equals(childParent.toString());
+    }
+
+    return parentPath.equals(childParent);
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
index 2b8803e..1e31234 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
@@ -56,6 +56,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
+
+import org.jetbrains.annotations.NotNull;
 import org.junit.After;
 import org.junit.Assert;
 
@@ -120,7 +122,8 @@ public class TestOzoneFileInterfaces {
 
   private OMMetrics omMetrics;
 
-  private boolean enableFileSystemPaths;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected boolean enableFileSystemPaths;
 
   public TestOzoneFileInterfaces(boolean setDefaultFs,
       boolean useAbsolutePath, boolean enabledFileSystemPaths) {
@@ -135,9 +138,8 @@ public class TestOzoneFileInterfaces {
     volumeName = RandomStringUtils.randomAlphabetic(10).toLowerCase();
     bucketName = RandomStringUtils.randomAlphabetic(10).toLowerCase();
 
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
-        enableFileSystemPaths);
+    OzoneConfiguration conf = getOzoneConfiguration();
+
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(3)
         .build();
@@ -162,6 +164,14 @@ public class TestOzoneFileInterfaces {
     omMetrics = cluster.getOzoneManager().getMetrics();
   }
 
+  @NotNull
+  protected OzoneConfiguration getOzoneConfiguration() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
+        enableFileSystemPaths);
+    return conf;
+  }
+
   @After
   public void teardown() throws IOException {
     if (cluster != null) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfacesV1.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfacesV1.java
new file mode 100644
index 0000000..93473be
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfacesV1.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test OzoneFileSystem Interfaces layout version V1.
+ *
+ * This test will test the various interfaces i.e.
+ * create, read, write, getFileStatus
+ */
+@RunWith(Parameterized.class)
+public class TestOzoneFileInterfacesV1 extends TestOzoneFileInterfaces {
+
+  public TestOzoneFileInterfacesV1(boolean setDefaultFs,
+      boolean useAbsolutePath, boolean enabledFileSystemPaths) {
+    super(setDefaultFs, useAbsolutePath, enabledFileSystemPaths);
+  }
+
+  @NotNull
+  @Override
+  protected OzoneConfiguration getOzoneConfiguration() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
+            enableFileSystemPaths);
+    conf.set(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, "V1");
+    return conf;
+  }
+
+  @Override
+  @Test
+  @Ignore("TODO:HDDS-2939")
+  public void testDirectory() {
+
+  }
+
+  @Override
+  @Test
+  @Ignore("TODO:HDDS-2939")
+  public void testOzFsReadWrite() {
+
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
index bd8d45c..c9e56a7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
@@ -64,6 +64,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.hadoop.test.LambdaTestUtils;
+import org.jetbrains.annotations.NotNull;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -100,17 +101,26 @@ public class TestOzoneFileSystem {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestOzoneFileSystem.class);
 
-  private boolean enabledFileSystemPaths;
-
-  private MiniOzoneCluster cluster;
-  private FileSystem fs;
-  private OzoneFileSystem o3fs;
-  private String volumeName;
-  private String bucketName;
-  private int rootItemCount;
-  private Trash trash;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected boolean enabledFileSystemPaths;
+
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected MiniOzoneCluster cluster;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected FileSystem fs;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected OzoneFileSystem o3fs;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected String volumeName;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected String bucketName;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected int rootItemCount;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected Trash trash;
 
   public void testCreateFileShouldCheckExistenceOfDirWithSameName()
+
       throws Exception {
     /*
      * Op 1. create file -> /d1/d2/d3/d4/key2
@@ -156,6 +166,28 @@ public class TestOzoneFileSystem {
       // ignore as its expected
     }
 
+    // Directory
+    FileStatus fileStatus = fs.getFileStatus(parent);
+    assertEquals("FileStatus did not return the directory",
+            "/d1/d2/d3/d4", fileStatus.getPath().toUri().getPath());
+    assertTrue("FileStatus did not return the directory",
+            fileStatus.isDirectory());
+
+    // invalid sub directory
+    try{
+      fs.getFileStatus(new Path("/d1/d2/d3/d4/key3/invalid"));
+      fail("Should throw FileNotFoundException");
+    } catch (FileNotFoundException fnfe) {
+      // ignore as its expected
+    }
+    // invalid file name
+    try{
+      fs.getFileStatus(new Path("/d1/d2/d3/d4/invalidkey"));
+      fail("Should throw FileNotFoundException");
+    } catch (FileNotFoundException fnfe) {
+      // ignore as its expected
+    }
+
     // Cleanup
     fs.delete(new Path("/d1/"), true);
   }
@@ -249,12 +281,9 @@ public class TestOzoneFileSystem {
     }
   }
 
-  private void setupOzoneFileSystem()
+  protected void setupOzoneFileSystem()
       throws IOException, TimeoutException, InterruptedException {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setInt(FS_TRASH_INTERVAL_KEY, 1);
-    conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
-        enabledFileSystemPaths);
+    OzoneConfiguration conf = getOzoneConfig();
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(3)
         .build();
@@ -276,7 +305,16 @@ public class TestOzoneFileSystem {
     trash = new Trash(conf);
   }
 
-  private void testOzoneFsServiceLoader() throws IOException {
+  @NotNull
+  protected OzoneConfiguration getOzoneConfig() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(FS_TRASH_INTERVAL_KEY, 1);
+    conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
+            enabledFileSystemPaths);
+    return conf;
+  }
+
+  protected void testOzoneFsServiceLoader() throws IOException {
     assertEquals(
         FileSystem.getFileSystemClass(OzoneConsts.OZONE_URI_SCHEME, null),
         OzoneFileSystem.class);
@@ -441,7 +479,7 @@ public class TestOzoneFileSystem {
 
   }
 
-  private void testListStatus() throws Exception {
+  protected void testListStatus() throws Exception {
     Path parent = new Path("/testListStatus");
     Path file1 = new Path(parent, "key1");
     Path file2 = new Path(parent, "key2");
@@ -489,7 +527,7 @@ public class TestOzoneFileSystem {
   /**
    * Tests listStatus operation on root directory.
    */
-  private void testListStatusOnRoot() throws Exception {
+  protected void testListStatusOnRoot() throws Exception {
     Path root = new Path("/");
     Path dir1 = new Path(root, "dir1");
     Path dir12 = new Path(dir1, "dir12");
@@ -516,7 +554,7 @@ public class TestOzoneFileSystem {
   /**
    * Tests listStatus operation on root directory.
    */
-  private void testListStatusOnLargeDirectory() throws Exception {
+  protected void testListStatusOnLargeDirectory() throws Exception {
     Path root = new Path("/");
     Set<String> paths = new TreeSet<>();
     int numDirs = 5111;
@@ -540,7 +578,7 @@ public class TestOzoneFileSystem {
   /**
    * Tests listStatus on a path with subdirs.
    */
-  private void testListStatusOnSubDirs() throws Exception {
+  protected void testListStatusOnSubDirs() throws Exception {
     // Create the following key structure
     //      /dir1/dir11/dir111
     //      /dir1/dir12
@@ -653,7 +691,7 @@ public class TestOzoneFileSystem {
     GenericTestUtils.assertExceptionContains("KEY_NOT_FOUND", ex);
   }
 
-  private void testGetDirectoryModificationTime()
+  protected void testGetDirectoryModificationTime()
       throws IOException, InterruptedException {
     Path mdir1 = new Path("/mdir1");
     Path mdir11 = new Path(mdir1, "mdir11");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java
new file mode 100644
index 0000000..415aec8
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Ozone file system tests that are not covered by contract tests,
+ * layout version V1.
+ *
+ * Note: When adding new test(s), please append it in testFileSystem() to
+ * avoid test run time regression.
+ */
+@RunWith(Parameterized.class)
+public class TestOzoneFileSystemV1 extends TestOzoneFileSystem {
+
+  public TestOzoneFileSystemV1(boolean setDefaultFs) {
+    super(setDefaultFs);
+  }
+  /**
+   * Set a timeout for each test.
+   */
+  @Rule
+  public Timeout timeout = new Timeout(300000);
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestOzoneFileSystemV1.class);
+
+  private void testListStatusWithoutRecursiveSearch() throws Exception {
+    /*
+     * Op 1. create file -> /key1
+     * Op 2. create dir -> /d1/d2
+     * Op 3. create dir -> /d1/d3
+     * Op 4. create dir -> /d1/d4
+     * Op 5. create file -> /d1/key1
+     * Op 6. create file -> /d2/key1
+     * Op 7. create file -> /d1/d2/key1
+     */
+    Path key1 = new Path("/key1");
+    try (FSDataOutputStream outputStream = fs.create(key1, false)) {
+      assertNotNull("Should be able to create file: key1",
+              outputStream);
+    }
+    Path d1 = new Path("/d1");
+    Path dir1Key1 = new Path(d1, "key1");
+    try (FSDataOutputStream outputStream = fs.create(dir1Key1, false)) {
+      assertNotNull("Should be able to create file: " + dir1Key1,
+              outputStream);
+    }
+    Path d2 = new Path("/d2");
+    Path dir2Key1 = new Path(d2, "key1");
+    try (FSDataOutputStream outputStream = fs.create(dir2Key1, false)) {
+      assertNotNull("Should be able to create file: " + dir2Key1,
+              outputStream);
+    }
+    Path dir1Dir2 = new Path("/d1/d2/");
+    Path dir1Dir2Key1 = new Path(dir1Dir2, "key1");
+    try (FSDataOutputStream outputStream = fs.create(dir1Dir2Key1, false)) {
+      assertNotNull("Should be able to create file: " + dir1Dir2Key1,
+              outputStream);
+    }
+    Path d1Key2 = new Path(d1, "key2");
+    try (FSDataOutputStream outputStream = fs.create(d1Key2, false)) {
+      assertNotNull("Should be able to create file: " + d1Key2,
+              outputStream);
+    }
+
+    Path dir1Dir3 = new Path("/d1/d3/");
+    Path dir1Dir4 = new Path("/d1/d4/");
+
+    fs.mkdirs(dir1Dir3);
+    fs.mkdirs(dir1Dir4);
+
+    // Root Directory
+    FileStatus[] fileStatusList = fs.listStatus(new Path("/"));
+    assertEquals("FileStatus should return files and directories",
+            3, fileStatusList.length);
+    ArrayList<String> expectedPaths = new ArrayList<>();
+    expectedPaths.add("o3fs://" + bucketName + "." + volumeName + "/d1");
+    expectedPaths.add("o3fs://" + bucketName + "." + volumeName + "/d2");
+    expectedPaths.add("o3fs://" + bucketName + "." + volumeName + "/key1");
+    for (FileStatus fileStatus : fileStatusList) {
+      expectedPaths.remove(fileStatus.getPath().toString());
+    }
+    assertEquals("Failed to return the filestatus[]" + expectedPaths,
+            0, expectedPaths.size());
+
+    // level-1 sub-dirs
+    fileStatusList = fs.listStatus(new Path("/d1"));
+    assertEquals("FileStatus should return files and directories",
+            5, fileStatusList.length);
+    expectedPaths = new ArrayList<>();
+    expectedPaths.add("o3fs://" + bucketName + "." + volumeName + "/d1/d2");
+    expectedPaths.add("o3fs://" + bucketName + "." + volumeName + "/d1/d3");
+    expectedPaths.add("o3fs://" + bucketName + "." + volumeName + "/d1/d4");
+    expectedPaths.add("o3fs://" + bucketName + "." + volumeName + "/d1/key1");
+    expectedPaths.add("o3fs://" + bucketName + "." + volumeName + "/d1/key2");
+    for (FileStatus fileStatus : fileStatusList) {
+      expectedPaths.remove(fileStatus.getPath().toString());
+    }
+    assertEquals("Failed to return the filestatus[]" + expectedPaths,
+            0, expectedPaths.size());
+
+    // level-2 sub-dirs
+    fileStatusList = fs.listStatus(new Path("/d1/d2"));
+    assertEquals("FileStatus should return files and directories",
+            1, fileStatusList.length);
+    expectedPaths = new ArrayList<>();
+    expectedPaths.add("o3fs://" + bucketName + "." + volumeName + "/d1/d2/" +
+            "key1");
+    for (FileStatus fileStatus : fileStatusList) {
+      expectedPaths.remove(fileStatus.getPath().toString());
+    }
+    assertEquals("Failed to return the filestatus[]" + expectedPaths,
+            0, expectedPaths.size());
+
+    // level-2 key2
+    fileStatusList = fs.listStatus(new Path("/d1/d2/key1"));
+    assertEquals("FileStatus should return files and directories",
+            1, fileStatusList.length);
+    expectedPaths = new ArrayList<>();
+    expectedPaths.add("o3fs://" + bucketName + "." + volumeName + "/d1/d2/" +
+            "key1");
+    for (FileStatus fileStatus : fileStatusList) {
+      expectedPaths.remove(fileStatus.getPath().toString());
+    }
+    assertEquals("Failed to return the filestatus[]" + expectedPaths,
+            0, expectedPaths.size());
+
+    // invalid root key
+    try {
+      fileStatusList = fs.listStatus(new Path("/key2"));
+      fail("Should throw FileNotFoundException");
+    } catch (FileNotFoundException fnfe) {
+      // ignore as its expected
+    }
+    try {
+      fileStatusList = fs.listStatus(new Path("/d1/d2/key2"));
+      fail("Should throw FileNotFoundException");
+    } catch (FileNotFoundException fnfe) {
+      // ignore as its expected
+
+    }
+  }
+
+  private void testListFilesRecursive() throws Exception {
+    /*
+     * Op 1. create file -> /d1/d1/d2/key1
+     * Op 2. create dir -> /key1
+     * Op 3. create dir -> /key2
+     * Op 4. create dir -> /d1/d2/d1/d2/key1
+     */
+    Path dir1Dir1Dir2Key1 = new Path("/d1/d1/d2/key1");
+    try (FSDataOutputStream outputStream = fs.create(dir1Dir1Dir2Key1,
+            false)) {
+      assertNotNull("Should be able to create file: " + dir1Dir1Dir2Key1,
+              outputStream);
+    }
+    Path key1 = new Path("/key1");
+    try (FSDataOutputStream outputStream = fs.create(key1, false)) {
+      assertNotNull("Should be able to create file: " + key1,
+              outputStream);
+    }
+    Path key2 = new Path("/key2");
+    try (FSDataOutputStream outputStream = fs.create(key2, false)) {
+      assertNotNull("Should be able to create file: key2",
+              outputStream);
+    }
+    Path dir1Dir2Dir1Dir2Key1 = new Path("/d1/d2/d1/d2/key1");
+    try (FSDataOutputStream outputStream = fs.create(dir1Dir2Dir1Dir2Key1,
+            false)) {
+      assertNotNull("Should be able to create file: "
+              + dir1Dir2Dir1Dir2Key1, outputStream);
+    }
+    RemoteIterator<LocatedFileStatus> fileStatusItr = fs.listFiles(new Path(
+            "/"), true);
+    String uriPrefix = "o3fs://" + bucketName + "." + volumeName;
+    ArrayList<String> expectedPaths = new ArrayList<>();
+    expectedPaths.add(uriPrefix + dir1Dir1Dir2Key1.toString());
+    expectedPaths.add(uriPrefix + key1.toString());
+    expectedPaths.add(uriPrefix + key2.toString());
+    expectedPaths.add(uriPrefix + dir1Dir2Dir1Dir2Key1.toString());
+    int expectedFilesCount = expectedPaths.size();
+    int actualCount = 0;
+    while (fileStatusItr.hasNext()) {
+      LocatedFileStatus status = fileStatusItr.next();
+      expectedPaths.remove(status.getPath().toString());
+      actualCount++;
+    }
+    assertEquals("Failed to get all the files: " + expectedPaths,
+            expectedFilesCount, actualCount);
+    assertEquals("Failed to get all the files: " + expectedPaths, 0,
+            expectedPaths.size());
+
+    // Recursive=false
+    fileStatusItr = fs.listFiles(new Path("/"), false);
+    expectedPaths.clear();
+    expectedPaths.add(uriPrefix + "/key1");
+    expectedPaths.add(uriPrefix + "/key2");
+    expectedFilesCount = expectedPaths.size();
+    actualCount = 0;
+    while (fileStatusItr.hasNext()) {
+      LocatedFileStatus status = fileStatusItr.next();
+      expectedPaths.remove(status.getPath().toString());
+      actualCount++;
+    }
+    assertEquals("Failed to get all the files: " + expectedPaths, 0,
+            expectedPaths.size());
+    assertEquals("Failed to get all the files: " + expectedPaths,
+            expectedFilesCount, actualCount);
+  }
+
+  @Test(timeout = 300_000)
+  @Override
+  public void testFileSystem() throws Exception {
+    setupOzoneFileSystem();
+
+    testOzoneFsServiceLoader();
+    o3fs = (OzoneFileSystem) fs;
+
+    testCreateFileShouldCheckExistenceOfDirWithSameName();
+    // TODO: Cleanup keyTable and dirTable explicitly as FS delete operation
+    //  is not yet implemented. This should be replaced with fs.delete() call.
+    tableCleanup();
+    testMakeDirsWithAnExistingDirectoryPath();
+    tableCleanup();
+    testCreateWithInvalidPaths();
+    tableCleanup();
+    testListStatusWithoutRecursiveSearch();
+    tableCleanup();
+    testListFilesRecursive();
+    tableCleanup();
+
+    testGetDirectoryModificationTime();
+    tableCleanup();
+
+    testListStatusOnRoot();
+    tableCleanup();
+    testListStatus();
+    tableCleanup();
+    testListStatusOnSubDirs();
+    tableCleanup();
+    testListStatusOnLargeDirectory();
+    tableCleanup();
+  }
+
+  /**
+   * Cleanup keyTable and directoryTable explicitly as FS delete operation
+   * is not yet supported.
+   *
+   * @throws IOException DB failure
+   */
+  protected void tableCleanup() throws IOException {
+    OMMetadataManager metadataMgr = cluster.getOzoneManager()
+            .getMetadataManager();
+    TableIterator<String, ? extends
+            Table.KeyValue<String, OmDirectoryInfo>> dirTableIterator =
+            metadataMgr.getDirectoryTable().iterator();
+    dirTableIterator.seekToFirst();
+    ArrayList <String> dirList = new ArrayList<>();
+    while (dirTableIterator.hasNext()) {
+      String key = dirTableIterator.key();
+      if (StringUtils.isNotBlank(key)) {
+        dirList.add(key);
+      }
+      dirTableIterator.next();
+    }
+
+    Iterator<Map.Entry<CacheKey<String>, CacheValue<OmDirectoryInfo>>>
+            cacheIterator = metadataMgr.getDirectoryTable().cacheIterator();
+    while(cacheIterator.hasNext()){
+      cacheIterator.next();
+      cacheIterator.remove();
+    }
+
+    for (String dirKey : dirList) {
+      metadataMgr.getDirectoryTable().delete(dirKey);
+      Assert.assertNull("Unexpected entry!",
+              metadataMgr.getDirectoryTable().get(dirKey));
+    }
+
+    Assert.assertTrue("DirTable is not empty",
+            metadataMgr.getDirectoryTable().isEmpty());
+
+    Assert.assertFalse(metadataMgr.getDirectoryTable().cacheIterator()
+            .hasNext());
+
+    TableIterator<String, ? extends
+            Table.KeyValue<String, OmKeyInfo>> keyTableIterator =
+            metadataMgr.getKeyTable().iterator();
+    keyTableIterator.seekToFirst();
+    ArrayList <String> fileList = new ArrayList<>();
+    while (keyTableIterator.hasNext()) {
+      String key = keyTableIterator.key();
+      if (StringUtils.isNotBlank(key)) {
+        fileList.add(key);
+      }
+      keyTableIterator.next();
+    }
+
+    Iterator<Map.Entry<CacheKey<String>, CacheValue<OmDirectoryInfo>>>
+            keyCacheIterator = metadataMgr.getDirectoryTable().cacheIterator();
+    while(keyCacheIterator.hasNext()){
+      keyCacheIterator.next();
+      keyCacheIterator.remove();
+    }
+
+    for (String fileKey : fileList) {
+      metadataMgr.getKeyTable().delete(fileKey);
+      Assert.assertNull("Unexpected entry!",
+              metadataMgr.getKeyTable().get(fileKey));
+    }
+
+    Assert.assertTrue("KeyTable is not empty",
+            metadataMgr.getKeyTable().isEmpty());
+
+    rootItemCount = 0;
+  }
+
+  @NotNull
+  @Override
+  protected OzoneConfiguration getOzoneConfig() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(FS_TRASH_INTERVAL_KEY, 1);
+    conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
+        enabledFileSystemPaths);
+    conf.set(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, "V1");
+    return conf;
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestHadoopDirTreeGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestHadoopDirTreeGenerator.java
index 6f5b113..9b07dcc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestHadoopDirTreeGenerator.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestHadoopDirTreeGenerator.java
@@ -31,12 +31,15 @@ import org.apache.ratis.server.raftlog.RaftLog;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
 
 /**
  * Test for HadoopDirTreeGenerator.
@@ -47,6 +50,8 @@ public class TestHadoopDirTreeGenerator {
   private OzoneConfiguration conf = null;
   private MiniOzoneCluster cluster = null;
   private ObjectStore store = null;
+  private static final Logger LOG =
+          LoggerFactory.getLogger(TestHadoopDirTreeGenerator.class);
 
   @Before
   public void setup() {
@@ -74,7 +79,7 @@ public class TestHadoopDirTreeGenerator {
    * @throws IOException
    */
   private void startCluster() throws Exception {
-    conf = new OzoneConfiguration();
+    conf = getOzoneConfiguration();
 
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
     cluster.waitForClusterToBeReady();
@@ -83,6 +88,10 @@ public class TestHadoopDirTreeGenerator {
     store = OzoneClientFactory.getRpcClient(conf).getObjectStore();
   }
 
+  protected OzoneConfiguration getOzoneConfiguration() {
+    return new OzoneConfiguration();
+  }
+
   @Test
   public void testNestedDirTreeGeneration() throws Exception {
     try {
@@ -103,6 +112,9 @@ public class TestHadoopDirTreeGenerator {
               2, 4, 2);
       verifyDirTree("vol5", "bucket1", 5,
               4, 1, 0);
+      // default page size is Constants.LISTING_PAGE_SIZE = 1024
+      verifyDirTree("vol6", "bucket1", 2,
+              1, 1100, 0);
     } finally {
       shutdown();
     }
@@ -122,6 +134,7 @@ public class TestHadoopDirTreeGenerator {
             fileCount + "", "-s", span + "", "-n", "1", "-r", rootPath,
                      "-g", perFileSizeInBytes + ""});
     // verify the directory structure
+    LOG.info("Started verifying the directory structure...");
     FileSystem fileSystem = FileSystem.get(URI.create(rootPath),
             conf);
     Path rootDir = new Path(rootPath.concat("/"));
@@ -149,6 +162,7 @@ public class TestHadoopDirTreeGenerator {
       verifyActualSpan(expectedSpanCnt, fileStatuses);
     }
     int actualNumFiles = 0;
+    ArrayList <String> files = new ArrayList<>();
     for (FileStatus fileStatus : fileStatuses) {
       if (fileStatus.isDirectory()) {
         ++depth;
@@ -157,6 +171,12 @@ public class TestHadoopDirTreeGenerator {
       } else {
         Assert.assertEquals("Mismatches file len",
                 perFileSizeInBytes, fileStatus.getLen());
+        String fName = fileStatus.getPath().getName();
+        Assert.assertFalse("actualNumFiles:" + actualNumFiles +
+                        ", fName:" + fName + ", expectedFileCnt:" +
+                        expectedFileCnt + ", depth:" + depth,
+                files.contains(fName));
+        files.add(fName);
         actualNumFiles++;
       }
     }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestHadoopDirTreeGeneratorV1.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestHadoopDirTreeGeneratorV1.java
new file mode 100644
index 0000000..99d4f26
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestHadoopDirTreeGeneratorV1.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+
+/**
+ * Test for HadoopDirTreeGenerator layout version V1.
+ */
+public class TestHadoopDirTreeGeneratorV1 extends TestHadoopDirTreeGenerator {
+
+  protected OzoneConfiguration getOzoneConfiguration() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(OMConfigKeys.OZONE_OM_LAYOUT_VERSION, "V1");
+    return conf;
+  }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 72ebfd2..0520db8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -75,6 +76,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -94,7 +96,9 @@ import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
 import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
@@ -124,6 +128,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BL
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND;
@@ -1780,6 +1785,10 @@ public class KeyManagerImpl implements KeyManager {
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
 
+    if (OzoneManagerRatisUtils.isOmLayoutVersionV1()) {
+      return getOzoneFileStatusV1(volumeName, bucketName, keyName,
+              args.getSortDatanodes(), clientAddress, false);
+    }
     return getOzoneFileStatus(volumeName, bucketName, keyName,
             args.getRefreshPipeline(), args.getSortDatanodes(), clientAddress);
   }
@@ -1844,6 +1853,65 @@ public class KeyManagerImpl implements KeyManager {
             FILE_NOT_FOUND);
   }
 
+
+  private OzoneFileStatus getOzoneFileStatusV1(String volumeName,
+      String bucketName, String keyName, boolean sortDatanodes,
+      String clientAddress, boolean skipFileNotFoundError) throws IOException {
+    OzoneFileStatus fileStatus = null;
+    metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName,
+            bucketName);
+    try {
+      // Check if this is the root of the filesystem.
+      if (keyName.length() == 0) {
+        validateBucket(volumeName, bucketName);
+        return new OzoneFileStatus();
+      }
+
+      fileStatus = OMFileRequest.getOMKeyInfoIfExists(metadataManager,
+              volumeName, bucketName, keyName, scmBlockSize);
+
+    } finally {
+      metadataManager.getLock().releaseReadLock(BUCKET_LOCK, volumeName,
+              bucketName);
+    }
+
+    if (fileStatus != null) {
+      // if the key is a file then do refresh pipeline info in OM by asking SCM
+      if (fileStatus.isFile()) {
+
+        OmKeyInfo fileKeyInfo = fileStatus.getKeyInfo();
+
+        // refreshPipeline flag check has been removed as part of
+        // https://issues.apache.org/jira/browse/HDDS-3658.
+        // Please refer this jira for more details.
+        refreshPipeline(fileKeyInfo);
+
+        if (sortDatanodes) {
+          sortDatanodeInPipeline(fileKeyInfo, clientAddress);
+        }
+        return new OzoneFileStatus(fileKeyInfo, scmBlockSize, false);
+      } else {
+        return fileStatus;
+      }
+    }
+
+    // Key not found.
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Unable to get file status for the key: volume: {}, bucket:" +
+                      " {}, key: {}, with error: No such file exists.",
+              volumeName, bucketName, keyName);
+    }
+
+    // don't throw exception if this flag is true.
+    if (skipFileNotFoundError) {
+      return fileStatus;
+    }
+
+    throw new OMException("Unable to get file status: volume: " +
+            volumeName + " bucket: " + bucketName + " key: " + keyName,
+            FILE_NOT_FOUND);
+  }
+
   /**
    * Ozone FS api to create a directory. Parent directories if do not exist
    * are created for the input directory.
@@ -2083,6 +2151,10 @@ public class KeyManagerImpl implements KeyManager {
       return fileStatusList;
     }
 
+    if (OzoneManagerRatisUtils.isOmLayoutVersionV1()) {
+      return listStatusV1(args, recursive, startKey, numEntries, clientAddress);
+    }
+
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
@@ -2220,6 +2292,332 @@ public class KeyManagerImpl implements KeyManager {
     return fileStatusList;
   }
 
+  public List<OzoneFileStatus> listStatusV1(OmKeyArgs args, boolean recursive,
+      String startKey, long numEntries, String clientAddress)
+          throws IOException {
+    Preconditions.checkNotNull(args, "Key args can not be null");
+
+    // unsorted OMKeyInfo list contains combine results from TableCache and DB.
+    List<OzoneFileStatus> fileStatusFinalList = new ArrayList<>();
+    LinkedHashSet<OzoneFileStatus> fileStatusList = new LinkedHashSet<>();
+    if (numEntries <= 0) {
+      return fileStatusFinalList;
+    }
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+    String seekFileInDB;
+    String seekDirInDB;
+    long prefixKeyInDB;
+    String prefixPath = keyName;
+    int countEntries = 0;
+
+    // TODO: recursive flag=true will be handled in HDDS-4360 jira.
+    metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName,
+            bucketName);
+    try {
+      if (Strings.isNullOrEmpty(startKey)) {
+        OzoneFileStatus fileStatus = getFileStatus(args, clientAddress);
+        if (fileStatus.isFile()) {
+          return Collections.singletonList(fileStatus);
+        }
+
+        // Not required to search in DeletedTable because all the deleted
+        // keys will be marked directly in dirTable or in keyTable by
+        // breaking the pointer to its sub-dirs and sub-files. So, there is no
+        // issue of inconsistency.
+
+        /*
+         * keyName is a directory.
+         * Say, "/a" is the dir name and its objectID is 1024, then seek
+         * will be doing with "1024/" to get all immediate descendants.
+         */
+        if (fileStatus.getKeyInfo() != null) {
+          prefixKeyInDB = fileStatus.getKeyInfo().getObjectID();
+        } else {
+          // list root directory.
+          String bucketKey = metadataManager.getBucketKey(volumeName,
+                  bucketName);
+          OmBucketInfo omBucketInfo =
+                  metadataManager.getBucketTable().get(bucketKey);
+          prefixKeyInDB = omBucketInfo.getObjectID();
+        }
+        seekFileInDB = metadataManager.getOzonePathKey(prefixKeyInDB, "");
+        seekDirInDB = metadataManager.getOzonePathKey(prefixKeyInDB, "");
+
+        // Order of seek -> (1)Seek dirs in dirTable (2)Seek files in fileTable
+        // 1. Seek the given key in key table.
+        countEntries = getFilesFromDirectory(fileStatusList, seekFileInDB,
+                prefixPath, prefixKeyInDB, startKey, countEntries, numEntries);
+        // 2. Seek the given key in dir table.
+        getDirectories(fileStatusList, seekDirInDB, prefixPath, prefixKeyInDB,
+                startKey, countEntries, numEntries, volumeName, bucketName,
+                recursive);
+      } else {
+        /*
+         * startKey will be used in iterator seek and sets the beginning point
+         * for key traversal.
+         * keyName will be used as parentID where the user has requested to
+         * list the keys from.
+         *
+         * When recursive flag=false, parentID won't change between two pages.
+         * For example: OM has a namespace like,
+         *    /a/1...1M files and /a/b/1...1M files.
+         *    /a/1...1M directories and /a/b/1...1M directories.
+         * Listing "/a", will always have the parentID as "a" irrespective of
+         * the startKey value.
+         */
+
+        // Check startKey is an immediate child of keyName. For example,
+        // keyName=/a/ and expected startKey=/a/b. startKey can't be /xyz/b.
+        if (!OzoneFSUtils.isImmediateChild(keyName, startKey)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("StartKey {} is not an immediate child of keyName {}. " +
+                    "Returns empty list", startKey, keyName);
+          }
+          return Collections.emptyList();
+        }
+
+        OzoneFileStatus fileStatusInfo = getOzoneFileStatusV1(volumeName,
+                bucketName, startKey, false, null, true);
+
+        if (fileStatusInfo != null) {
+          prefixKeyInDB = fileStatusInfo.getKeyInfo().getParentObjectID();
+          if(fileStatusInfo.isDirectory()){
+            seekDirInDB = metadataManager.getOzonePathKey(prefixKeyInDB,
+                    fileStatusInfo.getKeyInfo().getFileName());
+
+            // Order of seek -> (1) Seek dirs only in dirTable. In OM, always
+            // the order of search is, first seek into fileTable and then
+            // dirTable. So, its not required to search again in the fileTable.
+
+            // Seek the given key in dirTable.
+            getDirectories(fileStatusList, seekDirInDB, prefixPath,
+                    prefixKeyInDB, startKey, countEntries, numEntries,
+                    volumeName, bucketName, recursive);
+          } else {
+            seekFileInDB = metadataManager.getOzonePathKey(prefixKeyInDB,
+                    fileStatusInfo.getKeyInfo().getFileName());
+            // begins from the first sub-dir under the parent dir
+            seekDirInDB = metadataManager.getOzonePathKey(prefixKeyInDB, "");
+
+            // 1. Seek the given key in key table.
+            countEntries = getFilesFromDirectory(fileStatusList, seekFileInDB,
+                    prefixPath, prefixKeyInDB, startKey, countEntries,
+                    numEntries);
+            // 2. Seek the given key in dir table.
+            getDirectories(fileStatusList, seekDirInDB, prefixPath,
+                    prefixKeyInDB, startKey, countEntries, numEntries,
+                    volumeName, bucketName, recursive);
+          }
+        } else {
+          // TODO: HDDS-4364: startKey can be a non-existed key
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("StartKey {} is a non-existed key and returning empty " +
+                    "list", startKey);
+          }
+          return Collections.emptyList();
+        }
+      }
+    } finally {
+      metadataManager.getLock().releaseReadLock(BUCKET_LOCK, volumeName,
+              bucketName);
+    }
+    for (OzoneFileStatus fileStatus : fileStatusList) {
+      if (fileStatus.isFile()) {
+        // refreshPipeline flag check has been removed as part of
+        // https://issues.apache.org/jira/browse/HDDS-3658.
+        // Please refer this jira for more details.
+        refreshPipeline(fileStatus.getKeyInfo());
+
+        // No need to check if a key is deleted or not here, this is handled
+        // when adding entries to cacheKeyMap from DB.
+        if (args.getSortDatanodes()) {
+          sortDatanodeInPipeline(fileStatus.getKeyInfo(), clientAddress);
+        }
+      }
+    }
+    fileStatusFinalList.addAll(fileStatusList);
+    return fileStatusFinalList;
+  }
+
+  @SuppressWarnings("parameternumber")
+  protected int getDirectories(Set<OzoneFileStatus> fileStatusList,
+      String seekDirInDB, String prefixPath, long prefixKeyInDB,
+      String startKey, int countEntries, long numEntries, String volumeName,
+      String bucketName, boolean recursive) throws IOException {
+
+    Table dirTable = metadataManager.getDirectoryTable();
+    countEntries = listStatusFindDirsInTableCache(fileStatusList, dirTable,
+            prefixKeyInDB, seekDirInDB, prefixPath, startKey, volumeName,
+            bucketName, countEntries, numEntries);
+    TableIterator<String, ? extends Table.KeyValue<String, OmDirectoryInfo>>
+            iterator = dirTable.iterator();
+
+    iterator.seek(seekDirInDB);
+
+    while (iterator.hasNext() && numEntries - countEntries > 0) {
+      OmDirectoryInfo dirInfo = iterator.value().getValue();
+      if (!isImmediateChild(dirInfo.getParentObjectID(), prefixKeyInDB)) {
+        break;
+      }
+
+      // TODO: recursive list will be handled in HDDS-4360 jira.
+      if (!recursive) {
+        String dirName = OMFileRequest.getAbsolutePath(prefixPath,
+                dirInfo.getName());
+        OmKeyInfo omKeyInfo = OMFileRequest.getOmKeyInfo(volumeName,
+                bucketName, dirInfo, dirName);
+        fileStatusList.add(new OzoneFileStatus(omKeyInfo, scmBlockSize,
+                true));
+        countEntries++;
+      }
+      // move to next entry in the DirTable
+      iterator.next();
+    }
+
+    return countEntries;
+  }
+
+  private int getFilesFromDirectory(Set<OzoneFileStatus> fileStatusList,
+      String seekKeyInDB, String prefixKeyPath, long prefixKeyInDB,
+      String startKey, int countEntries, long numEntries) throws IOException {
+
+    Table<String, OmKeyInfo> keyTable = metadataManager.getKeyTable();
+    countEntries = listStatusFindFilesInTableCache(fileStatusList, keyTable,
+            prefixKeyInDB, seekKeyInDB, prefixKeyPath, startKey,
+            countEntries, numEntries);
+    TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+            iterator = keyTable.iterator();
+    iterator.seek(seekKeyInDB);
+    while (iterator.hasNext() && numEntries - countEntries > 0) {
+      OmKeyInfo keyInfo = iterator.value().getValue();
+
+      if (!isImmediateChild(keyInfo.getParentObjectID(), prefixKeyInDB)) {
+        break;
+      }
+
+      keyInfo.setFileName(keyInfo.getKeyName());
+      String fullKeyPath = OMFileRequest.getAbsolutePath(prefixKeyPath,
+              keyInfo.getKeyName());
+      keyInfo.setKeyName(fullKeyPath);
+      fileStatusList.add(new OzoneFileStatus(keyInfo, scmBlockSize, false));
+      countEntries++;
+      iterator.next(); // move to next entry in the table
+    }
+    return countEntries;
+  }
+
+  private boolean isImmediateChild(long parentId, long ancestorId) {
+    return parentId == ancestorId;
+  }
+
+  /**
+   * Helper function for listStatus to find key in FileTableCache.
+   */
+  @SuppressWarnings("parameternumber")
+  private int listStatusFindFilesInTableCache(
+          Set<OzoneFileStatus> fileStatusList, Table<String,
+          OmKeyInfo> keyTable, long prefixKeyInDB, String seekKeyInDB,
+          String prefixKeyPath, String startKey, int countEntries,
+          long numEntries) {
+
+    Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>>
+            cacheIter = keyTable.cacheIterator();
+
+    // TODO: recursive list will be handled in HDDS-4360 jira.
+    while (cacheIter.hasNext() && numEntries - countEntries > 0) {
+      Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>> entry =
+              cacheIter.next();
+      String cacheKey = entry.getKey().getCacheKey();
+      OmKeyInfo cacheOmKeyInfo = entry.getValue().getCacheValue();
+      // cacheOmKeyInfo is null if an entry is deleted in cache
+      if(cacheOmKeyInfo == null){
+        continue;
+      }
+
+      cacheOmKeyInfo.setFileName(cacheOmKeyInfo.getKeyName());
+      String fullKeyPath = OMFileRequest.getAbsolutePath(prefixKeyPath,
+              cacheOmKeyInfo.getKeyName());
+      cacheOmKeyInfo.setKeyName(fullKeyPath);
+
+      countEntries = addKeyInfoToFileStatusList(fileStatusList, prefixKeyInDB,
+              seekKeyInDB, startKey, countEntries, cacheKey, cacheOmKeyInfo,
+              false);
+    }
+    return countEntries;
+  }
+
+  /**
+   * Helper function for listStatus to find key in DirTableCache.
+   */
+  @SuppressWarnings("parameternumber")
+  private int listStatusFindDirsInTableCache(
+          Set<OzoneFileStatus> fileStatusList, Table<String,
+          OmDirectoryInfo> dirTable, long prefixKeyInDB, String seekKeyInDB,
+          String prefixKeyPath, String startKey, String volumeName,
+          String bucketName, int countEntries, long numEntries) {
+
+    Iterator<Map.Entry<CacheKey<String>, CacheValue<OmDirectoryInfo>>>
+            cacheIter = dirTable.cacheIterator();
+    // seekKeyInDB will have two type of values.
+    // 1. "1024/"   -> startKey is null or empty
+    // 2. "1024/b"  -> startKey exists
+    // TODO: recursive list will be handled in HDDS-4360 jira.
+    while (cacheIter.hasNext() && numEntries - countEntries > 0) {
+      Map.Entry<CacheKey<String>, CacheValue<OmDirectoryInfo>> entry =
+              cacheIter.next();
+      String cacheKey = entry.getKey().getCacheKey();
+      OmDirectoryInfo cacheOmDirInfo = entry.getValue().getCacheValue();
+      if(cacheOmDirInfo == null){
+        continue;
+      }
+      String fullDirPath = OMFileRequest.getAbsolutePath(prefixKeyPath,
+              cacheOmDirInfo.getName());
+      OmKeyInfo cacheDirKeyInfo = OMFileRequest.getOmKeyInfo(volumeName,
+              bucketName, cacheOmDirInfo, fullDirPath);
+
+      countEntries = addKeyInfoToFileStatusList(fileStatusList, prefixKeyInDB,
+              seekKeyInDB, startKey, countEntries, cacheKey, cacheDirKeyInfo,
+              true);
+    }
+    return countEntries;
+  }
+
+  @SuppressWarnings("parameternumber")
+  private int addKeyInfoToFileStatusList(Set<OzoneFileStatus> fileStatusList,
+      long prefixKeyInDB, String seekKeyInDB, String startKey,
+      int countEntries, String cacheKey, OmKeyInfo cacheOmKeyInfo,
+      boolean isDirectory) {
+    // seekKeyInDB will have two type of values.
+    // 1. "1024/"   -> startKey is null or empty
+    // 2. "1024/b"  -> startKey exists
+    if (StringUtils.isBlank(startKey)) {
+      // startKey is null or empty, then the seekKeyInDB="1024/"
+      if (cacheKey.startsWith(seekKeyInDB)) {
+        OzoneFileStatus fileStatus = new OzoneFileStatus(cacheOmKeyInfo,
+                scmBlockSize, isDirectory);
+        fileStatusList.add(fileStatus);
+        countEntries++;
+      }
+    } else {
+      // startKey not empty, then the seekKeyInDB="1024/b" and
+      // seekKeyInDBWithOnlyParentID = "1024/". This is to avoid case of
+      // parentID with "102444" cache entries.
+      // Here, it has to list all the keys after "1024/b" and requires >=0
+      // string comparison.
+      String seekKeyInDBWithOnlyParentID = prefixKeyInDB + OM_KEY_PREFIX;
+      if (cacheKey.startsWith(seekKeyInDBWithOnlyParentID) &&
+              cacheKey.compareTo(seekKeyInDB) >= 0) {
+        OzoneFileStatus fileStatus = new OzoneFileStatus(cacheOmKeyInfo,
+                scmBlockSize, isDirectory);
+        fileStatusList.add(fileStatus);
+        countEntries++;
+      }
+    }
+    return countEntries;
+  }
+
   private String getNextGreaterString(String volumeName, String bucketName,
       String keyPrefix) throws IOException {
     // Increment the last character of the string and return the new ozone key.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestV1.java
index c48ff78..fa66766 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestV1.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestV1.java
@@ -232,7 +232,6 @@ public class OMDirectoryCreateRequestV1 extends OMDirectoryCreateRequest {
   /**
    * Construct OmDirectoryInfo for every parent directory in missing list.
    *
-   * @param ozoneManager Ozone Manager
    * @param keyArgs      key arguments
    * @param pathInfo     list of parent directories to be created and its ACLs
    * @param trxnLogIndex transaction log index id
@@ -243,7 +242,6 @@ public class OMDirectoryCreateRequestV1 extends OMDirectoryCreateRequest {
           OzoneManager ozoneManager, KeyArgs keyArgs,
           OMFileRequest.OMPathInfoV1 pathInfo, long trxnLogIndex)
           throws IOException {
-    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
     List<OmDirectoryInfo> missingParentInfos = new ArrayList<>();
 
     // The base id is left shifted by 8 bits for creating space to
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
index ce8d49b..5225d82 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
@@ -20,11 +20,15 @@ package org.apache.hadoop.ozone.om.request.file;
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
@@ -34,7 +38,10 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -432,7 +439,6 @@ public final class OMFileRequest {
    * @param omFileInfo        key info
    * @param fileName          file name
    * @param trxnLogIndex      transaction log index
-   * @return dbOmFileInfo, which keeps leaf node name in keyName field
    */
   public static void addOpenFileTableCacheEntry(
           OMMetadataManager omMetadataManager, String dbOpenFileName,
@@ -460,7 +466,6 @@ public final class OMFileRequest {
    * @param omFileInfo        key info
    * @param fileName          file name
    * @param trxnLogIndex      transaction log index
-   * @return dbOmFileInfo, which keeps leaf node name in keyName field
    */
   public static void addFileTableCacheEntry(
           OMMetadataManager omMetadataManager, String dbFileKey,
@@ -552,4 +557,112 @@ public final class OMFileRequest {
     return dbOmKeyInfo;
   }
 
+  /**
+   * Gets OmKeyInfo if exists for the given key name in the DB.
+   *
+   * @param omMetadataMgr metadata manager
+   * @param volumeName    volume name
+   * @param bucketName    bucket name
+   * @param keyName       key name
+   * @param scmBlockSize  scm block size
+   * @return OzoneFileStatus
+   * @throws IOException DB failure
+   */
+  @Nullable
+  public static OzoneFileStatus getOMKeyInfoIfExists(
+      OMMetadataManager omMetadataMgr, String volumeName, String bucketName,
+      String keyName, long scmBlockSize) throws IOException {
+
+    Path keyPath = Paths.get(keyName);
+    Iterator<Path> elements = keyPath.iterator();
+    String bucketKey = omMetadataMgr.getBucketKey(volumeName, bucketName);
+    OmBucketInfo omBucketInfo =
+            omMetadataMgr.getBucketTable().get(bucketKey);
+
+    long lastKnownParentId = omBucketInfo.getObjectID();
+    OmDirectoryInfo omDirInfo = null;
+    while (elements.hasNext()) {
+      String fileName = elements.next().toString();
+
+      // For example, /vol1/buck1/a/b/c/d/e/file1.txt
+      // 1. Do lookup path component on directoryTable starting from bucket
+      // 'buck1' to the leaf node component, which is 'file1.txt'.
+      // 2. If there is no dir exists for the leaf node component 'file1.txt'
+      // then do look it on fileTable.
+      String dbNodeName = omMetadataMgr.getOzonePathKey(
+              lastKnownParentId, fileName);
+      omDirInfo = omMetadataMgr.getDirectoryTable().get(dbNodeName);
+
+      if (omDirInfo != null) {
+        lastKnownParentId = omDirInfo.getObjectID();
+      } else if (!elements.hasNext()) {
+        // reached last path component. Check file exists for the given path.
+        OmKeyInfo omKeyInfo = OMFileRequest.getOmKeyInfoFromFileTable(false,
+                omMetadataMgr, dbNodeName, keyName);
+        if (omKeyInfo != null) {
+          return new OzoneFileStatus(omKeyInfo, scmBlockSize, false);
+        }
+      } else {
+        // Missing intermediate directory and just return null;
+        // key not found in DB
+        return null;
+      }
+    }
+
+    if (omDirInfo != null) {
+      OmKeyInfo omKeyInfo = getOmKeyInfo(volumeName, bucketName, omDirInfo,
+              keyName);
+      return new OzoneFileStatus(omKeyInfo, scmBlockSize, true);
+    }
+
+    // key not found in DB
+    return null;
+  }
+
+  /**
+   * Prepare OmKeyInfo from OmDirectoryInfo.
+   *
+   * @param volumeName volume name
+   * @param bucketName bucket name
+   * @param dirInfo    directory info
+   * @param keyName    user given key name
+   * @return OmKeyInfo object
+   */
+  @NotNull
+  public static OmKeyInfo getOmKeyInfo(String volumeName, String bucketName,
+      OmDirectoryInfo dirInfo, String keyName) {
+
+    OmKeyInfo.Builder builder = new OmKeyInfo.Builder();
+    builder.setParentObjectID(dirInfo.getParentObjectID());
+    builder.setKeyName(keyName);
+    builder.setAcls(dirInfo.getAcls());
+    builder.addAllMetadata(dirInfo.getMetadata());
+    builder.setVolumeName(volumeName);
+    builder.setBucketName(bucketName);
+    builder.setCreationTime(dirInfo.getCreationTime());
+    builder.setModificationTime(dirInfo.getModificationTime());
+    builder.setObjectID(dirInfo.getObjectID());
+    builder.setUpdateID(dirInfo.getUpdateID());
+    builder.setFileName(dirInfo.getName());
+    builder.setReplicationType(HddsProtos.ReplicationType.RATIS);
+    builder.setReplicationFactor(HddsProtos.ReplicationFactor.ONE);
+    builder.setOmKeyLocationInfos(Collections.singletonList(
+            new OmKeyLocationInfoGroup(0, new ArrayList<>())));
+    return builder.build();
+  }
+
+  /**
+   * Returns absolute path.
+   *
+   * @param prefixName prefix path
+   * @param fileName   file name
+   * @return absolute path
+   */
+  @NotNull
+  public static String getAbsolutePath(String prefixName, String fileName) {
+    if (Strings.isNullOrEmpty(prefixName)) {
+      return fileName;
+    }
+    return prefixName.concat(OzoneConsts.OZONE_URI_DELIMITER).concat(fileName);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org