You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2020/06/09 16:22:44 UTC

[GitHub] [hadoop-ozone] elek commented on a change in pull request #1021: HDDS-2665. Implement new Ozone Filesystem scheme ofs://

elek commented on a change in pull request #1021:
URL: https://github.com/apache/hadoop-ozone/pull/1021#discussion_r436736394



##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
##########
@@ -0,0 +1,876 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
+import org.apache.hadoop.ozone.security.acl.OzoneAclConfig;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
+
+/**
+ * Ozone file system tests that are not covered by contract tests.
+ * TODO: Refactor this and TestOzoneFileSystem later to reduce code duplication.
+ */
+public class TestRootedOzoneFileSystem {
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(300_000);
+
+  private OzoneConfiguration conf;
+  private MiniOzoneCluster cluster = null;
+  private FileSystem fs;
+  private RootedOzoneFileSystem ofs;
+  private ObjectStore objectStore;
+  private static BasicRootedOzoneClientAdapterImpl adapter;
+
+  private String volumeName;
+  private String bucketName;
+  // Store path commonly used by tests that test functionality within a bucket
+  private Path testBucketPath;
+  private String rootPath;
+
+  @Before
+  public void init() throws Exception {
+    conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+    objectStore = cluster.getClient().getObjectStore();
+
+    // create a volume and a bucket to be used by RootedOzoneFileSystem (OFS)
+    OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(cluster);
+    volumeName = bucket.getVolumeName();
+    bucketName = bucket.getName();
+    String testBucketStr =
+        OZONE_URI_DELIMITER + volumeName + OZONE_URI_DELIMITER + bucketName;
+    testBucketPath = new Path(testBucketStr);
+
+    rootPath = String.format("%s://%s/",
+        OzoneConsts.OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
+
+    // Set the fs.defaultFS and start the filesystem
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+    // Note: FileSystem#loadFileSystems won't load OFS class due to META-INF
+    //  hence this workaround.
+    conf.set("fs.ofs.impl", "org.apache.hadoop.fs.ozone.RootedOzoneFileSystem");
+    fs = FileSystem.get(conf);
+    ofs = (RootedOzoneFileSystem) fs;
+    adapter = (BasicRootedOzoneClientAdapterImpl) ofs.getAdapter();
+  }
+
+  @After
+  public void teardown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    IOUtils.closeQuietly(fs);
+  }
+
+  @Test
+  public void testOzoneFsServiceLoader() throws IOException {
+    OzoneConfiguration confTestLoader = new OzoneConfiguration();
+    // Note: FileSystem#loadFileSystems won't load OFS class due to META-INF
+    //  hence this workaround.
+    confTestLoader.set("fs.ofs.impl",
+        "org.apache.hadoop.fs.ozone.RootedOzoneFileSystem");
+    Assert.assertEquals(FileSystem.getFileSystemClass(
+        OzoneConsts.OZONE_OFS_URI_SCHEME, confTestLoader),
+        RootedOzoneFileSystem.class);
+  }
+
+  @Test
+  public void testCreateDoesNotAddParentDirKeys() throws Exception {
+    Path grandparent = new Path(testBucketPath,
+        "testCreateDoesNotAddParentDirKeys");
+    Path parent = new Path(grandparent, "parent");
+    Path child = new Path(parent, "child");
+    ContractTestUtils.touch(fs, child);
+
+    OzoneKeyDetails key = getKey(child, false);
+    OFSPath childOFSPath = new OFSPath(child);
+    Assert.assertEquals(key.getName(), childOFSPath.getKeyName());
+
+    // Creating a child should not add parent keys to the bucket
+    try {
+      getKey(parent, true);
+    } catch (IOException ex) {
+      assertKeyNotFoundException(ex);
+    }
+
+    // List status on the parent should show the child file
+    Assert.assertEquals(
+        "List status of parent should include the 1 child file",
+        1L, fs.listStatus(parent).length);
+    Assert.assertTrue(
+        "Parent directory does not appear to be a directory",
+        fs.getFileStatus(parent).isDirectory());
+  }
+
+  @Test
+  public void testDeleteCreatesFakeParentDir() throws Exception {
+    Path grandparent = new Path(testBucketPath,
+        "testDeleteCreatesFakeParentDir");
+    Path parent = new Path(grandparent, "parent");
+    Path child = new Path(parent, "child");
+    ContractTestUtils.touch(fs, child);
+
+    // Verify that parent dir key does not exist
+    // Creating a child should not add parent keys to the bucket
+    try {
+      getKey(parent, true);
+    } catch (IOException ex) {
+      assertKeyNotFoundException(ex);
+    }
+
+    // Delete the child key
+    Assert.assertTrue(fs.delete(child, false));
+
+    // Deleting the only child should create the parent dir key if it does
+    // not exist
+    OFSPath parentOFSPath = new OFSPath(parent);
+    String parentKey = parentOFSPath.getKeyName() + "/";
+    OzoneKeyDetails parentKeyInfo = getKey(parent, true);
+    Assert.assertEquals(parentKey, parentKeyInfo.getName());
+
+    // Recursive delete with DeleteIterator
+    Assert.assertTrue(fs.delete(grandparent, true));
+  }
+
+  @Test
+  public void testListStatus() throws Exception {
+    Path parent = new Path(testBucketPath, "testListStatus");
+    Path file1 = new Path(parent, "key1");
+    Path file2 = new Path(parent, "key2");
+
+    FileStatus[] fileStatuses = ofs.listStatus(testBucketPath);
+    Assert.assertEquals("Should be empty", 0, fileStatuses.length);
+
+    ContractTestUtils.touch(fs, file1);
+    ContractTestUtils.touch(fs, file2);
+
+    fileStatuses = ofs.listStatus(testBucketPath);
+    Assert.assertEquals("Should have created parent",
+        1, fileStatuses.length);
+    Assert.assertEquals("Parent path doesn't match",
+        fileStatuses[0].getPath().toUri().getPath(), parent.toString());
+
+    // ListStatus on a directory should return all subdirs along with
+    // files, even if there exists a file and sub-dir with the same name.
+    fileStatuses = ofs.listStatus(parent);
+    Assert.assertEquals(
+        "FileStatus did not return all children of the directory",
+        2, fileStatuses.length);
+
+    // ListStatus should return only the immediate children of a directory.
+    Path file3 = new Path(parent, "dir1/key3");
+    Path file4 = new Path(parent, "dir1/key4");
+    ContractTestUtils.touch(fs, file3);
+    ContractTestUtils.touch(fs, file4);
+    fileStatuses = ofs.listStatus(parent);
+    Assert.assertEquals(
+        "FileStatus did not return all children of the directory",
+        3, fileStatuses.length);
+  }
+
+  /**
+   * OFS: Helper function for tests. Return a volume name that doesn't exist.
+   */
+  private String getRandomNonExistVolumeName() throws IOException {
+    final int numDigit = 5;
+    long retriesLeft = Math.round(Math.pow(10, 5));
+    String name = null;
+    while (name == null && retriesLeft-- > 0) {
+      name = "volume-" + RandomStringUtils.randomNumeric(numDigit);
+      // Check volume existence.
+      Iterator<? extends OzoneVolume> iter =
+          objectStore.listVolumesByUser(null, name, null);
+      if (iter.hasNext()) {
+        // If there is a match, try again.
+        // Note that volume name prefix match doesn't equal volume existence
+        //  but the check is sufficient for this test.
+        name = null;
+      }
+    }
+    if (retriesLeft <= 0) {
+      Assert.fail(
+          "Failed to generate random volume name that doesn't exist already.");
+    }
+    return name;
+  }
+
+  /**
+   * OFS: Test mkdir on volume, bucket and dir that doesn't exist.
+   */
+  @Test
+  public void testMkdirOnNonExistentVolumeBucketDir() throws Exception {
+    String volumeNameLocal = getRandomNonExistVolumeName();
+    String bucketNameLocal = "bucket-" + RandomStringUtils.randomNumeric(5);
+    Path root = new Path("/" + volumeNameLocal + "/" + bucketNameLocal);
+    Path dir1 = new Path(root, "dir1");
+    Path dir12 = new Path(dir1, "dir12");
+    Path dir2 = new Path(root, "dir2");
+    fs.mkdirs(dir12);
+    fs.mkdirs(dir2);
+
+    // Check volume and bucket existence, they should both be created.
+    OzoneVolume ozoneVolume = objectStore.getVolume(volumeNameLocal);
+    OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucketNameLocal);
+    OFSPath ofsPathDir1 = new OFSPath(dir12);
+    String key = ofsPathDir1.getKeyName() + "/";
+    OzoneKeyDetails ozoneKeyDetails = ozoneBucket.getKey(key);
+    Assert.assertEquals(key, ozoneKeyDetails.getName());
+
+    // Verify that directories are created.
+    FileStatus[] fileStatuses = ofs.listStatus(root);
+    Assert.assertEquals(
+        fileStatuses[0].getPath().toUri().getPath(), dir1.toString());
+    Assert.assertEquals(
+        fileStatuses[1].getPath().toUri().getPath(), dir2.toString());
+
+    fileStatuses = ofs.listStatus(dir1);
+    Assert.assertEquals(
+        fileStatuses[0].getPath().toUri().getPath(), dir12.toString());
+    fileStatuses = ofs.listStatus(dir12);
+    Assert.assertEquals(fileStatuses.length, 0);
+    fileStatuses = ofs.listStatus(dir2);
+    Assert.assertEquals(fileStatuses.length, 0);
+  }
+
+  /**
+   * OFS: Test mkdir on a volume and bucket that doesn't exist.
+   */
+  @Test
+  public void testMkdirNonExistentVolumeBucket() throws Exception {
+    String volumeNameLocal = getRandomNonExistVolumeName();
+    String bucketNameLocal = "bucket-" + RandomStringUtils.randomNumeric(5);
+    Path newVolBucket = new Path(
+        "/" + volumeNameLocal + "/" + bucketNameLocal);
+    fs.mkdirs(newVolBucket);
+
+    // Verify with listVolumes and listBuckets
+    Iterator<? extends OzoneVolume> iterVol =
+        objectStore.listVolumesByUser(null, volumeNameLocal, null);
+    OzoneVolume ozoneVolume = iterVol.next();
+    Assert.assertNotNull(ozoneVolume);
+    Assert.assertEquals(volumeNameLocal, ozoneVolume.getName());
+
+    Iterator<? extends OzoneBucket> iterBuc =
+        ozoneVolume.listBuckets("bucket-");
+    OzoneBucket ozoneBucket = iterBuc.next();
+    Assert.assertNotNull(ozoneBucket);
+    Assert.assertEquals(bucketNameLocal, ozoneBucket.getName());
+
+    // TODO: Use listStatus to check volume and bucket creation in HDDS-2928.
+  }
+
+  /**
+   * OFS: Test mkdir on a volume that doesn't exist.
+   */
+  @Test
+  public void testMkdirNonExistentVolume() throws Exception {
+    String volumeNameLocal = getRandomNonExistVolumeName();
+    Path newVolume = new Path("/" + volumeNameLocal);
+    fs.mkdirs(newVolume);
+
+    // Verify with listVolumes and listBuckets
+    Iterator<? extends OzoneVolume> iterVol =
+        objectStore.listVolumesByUser(null, volumeNameLocal, null);
+    OzoneVolume ozoneVolume = iterVol.next();
+    Assert.assertNotNull(ozoneVolume);
+    Assert.assertEquals(volumeNameLocal, ozoneVolume.getName());
+
+    // TODO: Use listStatus to check volume and bucket creation in HDDS-2928.
+  }
+
+  /**
+   * OFS: Test getFileStatus on root.
+   */
+  @Test
+  public void testGetFileStatusRoot() throws Exception {
+    Path root = new Path("/");
+    FileStatus fileStatus = fs.getFileStatus(root);
+    Assert.assertNotNull(fileStatus);
+    Assert.assertEquals(new Path(rootPath), fileStatus.getPath());
+    Assert.assertTrue(fileStatus.isDirectory());
+    Assert.assertEquals(FsPermission.getDirDefault(),
+        fileStatus.getPermission());
+  }
+
+  /**
+   * Test listStatus operation in a bucket.
+   */
+  @Test
+  public void testListStatusInBucket() throws Exception {
+    Path root = new Path("/" + volumeName + "/" + bucketName);
+    Path dir1 = new Path(root, "dir1");
+    Path dir12 = new Path(dir1, "dir12");
+    Path dir2 = new Path(root, "dir2");
+    fs.mkdirs(dir12);
+    fs.mkdirs(dir2);
+
+    // ListStatus on root should return dir1 (even though /dir1 key does not
+    // exist) and dir2 only. dir12 is not an immediate child of root and
+    // hence should not be listed.
+    FileStatus[] fileStatuses = ofs.listStatus(root);
+    Assert.assertEquals(
+        "FileStatus should return only the immediate children",
+        2, fileStatuses.length);
+
+    // Verify that dir12 is not included in the result of the listStatus on root
+    String fileStatus1 = fileStatuses[0].getPath().toUri().getPath();
+    String fileStatus2 = fileStatuses[1].getPath().toUri().getPath();
+    Assert.assertNotEquals(fileStatus1, dir12.toString());
+    Assert.assertNotEquals(fileStatus2, dir12.toString());
+  }
+
+  /**
+   * Tests listStatus operation on root directory.
+   */
+  @Test
+  public void testListStatusOnLargeDirectory() throws Exception {
+    Path root = new Path("/" + volumeName + "/" + bucketName);
+    Set<String> paths = new TreeSet<>();
+    int numDirs = LISTING_PAGE_SIZE + LISTING_PAGE_SIZE / 2;
+    for(int i = 0; i < numDirs; i++) {
+      Path p = new Path(root, String.valueOf(i));
+      fs.mkdirs(p);
+      paths.add(p.getName());
+    }
+
+    FileStatus[] fileStatuses = ofs.listStatus(root);
+    Assert.assertEquals(
+        "Total directories listed do not match the existing directories",
+        numDirs, fileStatuses.length);
+
+    for (int i=0; i < numDirs; i++) {
+      Assert.assertTrue(paths.contains(fileStatuses[i].getPath().getName()));
+    }
+  }
+
+  /**
+   * Tests listStatus on a path with subdirs.
+   */
+  @Test
+  public void testListStatusOnSubDirs() throws Exception {
+    // Create the following key structure
+    //      /dir1/dir11/dir111
+    //      /dir1/dir12
+    //      /dir1/dir12/file121
+    //      /dir2
+    // ListStatus on /dir1 should return all its immediated subdirs only
+    // which are /dir1/dir11 and /dir1/dir12. Super child files/dirs
+    // (/dir1/dir12/file121 and /dir1/dir11/dir111) should not be returned by
+    // listStatus.
+    Path dir1 = new Path(testBucketPath, "dir1");
+    Path dir11 = new Path(dir1, "dir11");
+    Path dir111 = new Path(dir11, "dir111");
+    Path dir12 = new Path(dir1, "dir12");
+    Path file121 = new Path(dir12, "file121");
+    Path dir2 = new Path(testBucketPath, "dir2");
+    fs.mkdirs(dir111);
+    fs.mkdirs(dir12);
+    ContractTestUtils.touch(fs, file121);
+    fs.mkdirs(dir2);
+
+    FileStatus[] fileStatuses = ofs.listStatus(dir1);
+    Assert.assertEquals(
+        "FileStatus should return only the immediate children",
+        2, fileStatuses.length);
+
+    // Verify that the two children of /dir1 returned by listStatus operation
+    // are /dir1/dir11 and /dir1/dir12.
+    String fileStatus1 = fileStatuses[0].getPath().toUri().getPath();
+    String fileStatus2 = fileStatuses[1].getPath().toUri().getPath();
+    Assert.assertTrue(fileStatus1.equals(dir11.toString()) ||
+        fileStatus1.equals(dir12.toString()));
+    Assert.assertTrue(fileStatus2.equals(dir11.toString()) ||
+        fileStatus2.equals(dir12.toString()));
+  }
+
+  @Test
+  public void testNonExplicitlyCreatedPathExistsAfterItsLeafsWereRemoved()
+      throws Exception {
+    Path source = new Path(testBucketPath, "source");
+    Path interimPath = new Path(source, "interimPath");
+    Path leafInsideInterimPath = new Path(interimPath, "leaf");
+    Path target = new Path(testBucketPath, "target");
+    Path leafInTarget = new Path(target, "leaf");
+
+    fs.mkdirs(source);
+    fs.mkdirs(target);
+    fs.mkdirs(leafInsideInterimPath);
+
+    Assert.assertTrue(fs.rename(leafInsideInterimPath, leafInTarget));
+
+    // after rename listStatus for interimPath should succeed and
+    // interimPath should have no children
+    FileStatus[] statuses = fs.listStatus(interimPath);
+    Assert.assertNotNull("liststatus returns a null array", statuses);
+    Assert.assertEquals("Statuses array is not empty", 0, statuses.length);
+    FileStatus fileStatus = fs.getFileStatus(interimPath);
+    Assert.assertEquals("FileStatus does not point to interimPath",
+        interimPath.getName(), fileStatus.getPath().getName());
+  }
+
+  /**
+   * OFS: Try to rename a key to a different bucket. The attempt should fail.
+   */
+  @Test
+  public void testRenameToDifferentBucket() throws IOException {
+    Path source = new Path(testBucketPath, "source");
+    Path interimPath = new Path(source, "interimPath");
+    Path leafInsideInterimPath = new Path(interimPath, "leaf");
+    Path target = new Path(testBucketPath, "target");
+
+    fs.mkdirs(source);
+    fs.mkdirs(target);
+    fs.mkdirs(leafInsideInterimPath);
+
+    // Attempt to rename the key to a different bucket
+    Path bucket2 = new Path(OZONE_URI_DELIMITER + volumeName +
+        OZONE_URI_DELIMITER + bucketName + "test");
+    Path leafInTargetInAnotherBucket = new Path(bucket2, "leaf");
+    try {
+      fs.rename(leafInsideInterimPath, leafInTargetInAnotherBucket);
+      Assert.fail(
+          "Should have thrown exception when renaming to a different bucket");
+    } catch (IOException ignored) {
+      // Test passed. Exception thrown as expected.
+    }
+  }
+
+  private OzoneKeyDetails getKey(Path keyPath, boolean isDirectory)
+      throws IOException {
+    String key = ofs.pathToKey(keyPath);
+    if (isDirectory) {
+      key = key + OZONE_URI_DELIMITER;
+    }
+    OFSPath ofsPath = new OFSPath(key);
+    String keyInBucket = ofsPath.getKeyName();
+    return cluster.getClient().getObjectStore().getVolume(volumeName)
+        .getBucket(bucketName).getKey(keyInBucket);
+  }
+
+  private void assertKeyNotFoundException(IOException ex) {
+    GenericTestUtils.assertExceptionContains("KEY_NOT_FOUND", ex);
+  }
+
+  /**
+   * Helper function for testListStatusRootAndVolume*.
+   * Each call creates one volume, one bucket under that volume,
+   * two dir under that bucket, one subdir under one of the dirs,
+   * and one file under the subdir.
+   */
+  private Path createRandomVolumeBucketWithDirs() throws IOException {
+    String volume1 = getRandomNonExistVolumeName();
+    String bucket1 = "bucket-" + RandomStringUtils.randomNumeric(5);
+    Path bucketPath1 = new Path(
+        OZONE_URI_DELIMITER + volume1 + OZONE_URI_DELIMITER + bucket1);
+
+    Path dir1 = new Path(bucketPath1, "dir1");
+    fs.mkdirs(dir1);  // Intentionally creating this "in-the-middle" dir key
+    Path subdir1 = new Path(dir1, "subdir1");
+    fs.mkdirs(subdir1);
+    Path dir2 = new Path(bucketPath1, "dir2");
+    fs.mkdirs(dir2);
+
+    try (FSDataOutputStream stream =
+        ofs.create(new Path(dir2, "file1"))) {
+      stream.write(1);
+    }
+
+    return bucketPath1;
+  }
+
+  /**
+   * OFS: Test non-recursive listStatus on root and volume.
+   */
+  @Test
+  public void testListStatusRootAndVolumeNonRecursive() throws Exception {
+    Path bucketPath1 = createRandomVolumeBucketWithDirs();
+    createRandomVolumeBucketWithDirs();
+    // listStatus("/volume/bucket")
+    FileStatus[] fileStatusBucket = ofs.listStatus(bucketPath1);
+    Assert.assertEquals(2, fileStatusBucket.length);
+    // listStatus("/volume")
+    Path volume = new Path(
+        OZONE_URI_DELIMITER + new OFSPath(bucketPath1).getVolumeName());
+    FileStatus[] fileStatusVolume = ofs.listStatus(volume);
+    Assert.assertEquals(1, fileStatusVolume.length);
+    // listStatus("/")
+    Path root = new Path(OZONE_URI_DELIMITER);
+    FileStatus[] fileStatusRoot = ofs.listStatus(root);
+    Assert.assertEquals(2, fileStatusRoot.length);
+  }
+
+  /**
+   * Helper function to do FileSystem#listStatus recursively.
+   * Simulate what FsShell does, using DFS.
+   */
+  private void listStatusRecursiveHelper(Path curPath, List<FileStatus> result)
+      throws IOException {
+    FileStatus[] startList = ofs.listStatus(curPath);
+    for (FileStatus fileStatus : startList) {
+      result.add(fileStatus);
+      if (fileStatus.isDirectory()) {
+        Path nextPath = fileStatus.getPath();
+        listStatusRecursiveHelper(nextPath, result);
+      }
+    }
+  }
+
+  /**
+   * Helper function to call listStatus in adapter implementation.
+   */
+  private List<FileStatus> callAdapterListStatus(String pathStr,
+      boolean recursive, String startPath, long numEntries) throws IOException {
+    return adapter.listStatus(pathStr, recursive, startPath, numEntries,
+        ofs.getUri(), ofs.getWorkingDirectory(), ofs.getUsername())
+        .stream().map(ofs::convertFileStatus).collect(Collectors.toList());
+  }
+
+  /**
+   * Helper function to compare recursive listStatus results from adapter
+   * and (simulated) FileSystem.
+   */
+  private void listStatusCheckHelper(Path path) throws IOException {
+    // Get recursive listStatus result directly from adapter impl
+    List<FileStatus> statusesFromAdapter = callAdapterListStatus(
+        path.toString(), true, "", 1000);
+    // Get recursive listStatus result with FileSystem API by simulating FsShell
+    List<FileStatus> statusesFromFS = new ArrayList<>();
+    listStatusRecursiveHelper(path, statusesFromFS);
+    // Compare. The results would be in the same order due to assumptions:
+    // 1. They are both using DFS internally;
+    // 2. They both return ordered results.
+    Assert.assertEquals(statusesFromAdapter.size(), statusesFromFS.size());
+    final int n = statusesFromFS.size();
+    for (int i = 0; i < n; i++) {
+      FileStatus statusFromAdapter = statusesFromAdapter.get(i);
+      FileStatus statusFromFS = statusesFromFS.get(i);
+      Assert.assertEquals(statusFromAdapter.getPath(), statusFromFS.getPath());
+      Assert.assertEquals(statusFromAdapter.getLen(), statusFromFS.getLen());
+      Assert.assertEquals(statusFromAdapter.isDirectory(),
+          statusFromFS.isDirectory());
+      // TODO: When HDDS-3054 is in, uncomment the lines below.

Review comment:
       HDDS-3054 seems to be resolved as far as I see.

##########
File path: hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystemWithMocks.java
##########
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Ozone File system tests that are light weight and use mocks.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ OzoneClientFactory.class, UserGroupInformation.class })
+@PowerMockIgnore("javax.management.*")
+public class TestRootedOzoneFileSystemWithMocks {

Review comment:
       FYI: there is a full, in memory implementation of `ObjectStore` in `s3` project. Can be useful for similar tests if we move it to a common place. (BTW: I like this lightweight test). 

##########
File path: hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
##########
@@ -0,0 +1,904 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Progressable;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
+import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
+import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_EMPTY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_EMPTY;
+
+/**
+ * The minimal Ozone Filesystem implementation.
+ * <p>
+ * This is a basic version which doesn't extend
+ * KeyProviderTokenIssuer and doesn't include statistics. It can be used
+ * from older hadoop version. For newer hadoop version use the full featured
+ * BasicRootedOzoneFileSystem.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BasicRootedOzoneFileSystem extends FileSystem {
+  static final Logger LOG =
+      LoggerFactory.getLogger(BasicRootedOzoneFileSystem.class);
+
+  /**
+   * The Ozone client for connecting to Ozone server.
+   */
+
+  private URI uri;
+  private String userName;
+  private Path workingDir;
+  private OzoneClientAdapter adapter;
+  private BasicRootedOzoneClientAdapterImpl adapterImpl;

Review comment:
       Why don't we use the interface instead of implementation?

##########
File path: hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
##########
@@ -0,0 +1,904 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Progressable;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
+import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
+import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_EMPTY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_EMPTY;
+
+/**
+ * The minimal Ozone Filesystem implementation.
+ * <p>
+ * This is a basic version which doesn't extend
+ * KeyProviderTokenIssuer and doesn't include statistics. It can be used
+ * from older hadoop version. For newer hadoop version use the full featured
+ * BasicRootedOzoneFileSystem.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BasicRootedOzoneFileSystem extends FileSystem {
+  static final Logger LOG =
+      LoggerFactory.getLogger(BasicRootedOzoneFileSystem.class);
+
+  /**
+   * The Ozone client for connecting to Ozone server.
+   */
+
+  private URI uri;
+  private String userName;
+  private Path workingDir;
+  private OzoneClientAdapter adapter;
+  private BasicRootedOzoneClientAdapterImpl adapterImpl;
+
+  private static final String URI_EXCEPTION_TEXT =
+      "URL should be one of the following formats: " +
+      "ofs://om-service-id/path/to/key  OR " +
+      "ofs://om-host.example.com/path/to/key  OR " +
+      "ofs://om-host.example.com:5678/path/to/key";
+
+  @Override
+  public void initialize(URI name, Configuration conf) throws IOException {
+    super.initialize(name, conf);
+    setConf(conf);
+    Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name);
+    Preconditions.checkArgument(getScheme().equals(name.getScheme()),
+        "Invalid scheme provided in " + name);
+
+    String authority = name.getAuthority();
+    if (authority == null) {
+      // authority is null when fs.defaultFS is not a qualified ofs URI and
+      // ofs:/// is passed to the client. matcher will NPE if authority is null
+      throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
+    }
+
+    String omHostOrServiceId;
+    int omPort = -1;
+    // Parse hostname and port
+    String[] parts = authority.split(":");
+    if (parts.length > 2) {
+      throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
+    }
+    omHostOrServiceId = parts[0];
+    if (parts.length == 2) {
+      try {
+        omPort = Integer.parseInt(parts[1]);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
+      }
+    }
+
+    try {
+      uri = new URIBuilder().setScheme(OZONE_OFS_URI_SCHEME)
+          .setHost(authority)
+          .build();
+      LOG.trace("Ozone URI for OFS initialization is " + uri);
+
+      //isolated is the default for ozonefs-lib-legacy which includes the
+      // /ozonefs.txt, otherwise the default is false. It could be overridden.
+      boolean defaultValue =
+          BasicRootedOzoneFileSystem.class.getClassLoader()
+              .getResource("ozonefs.txt") != null;
+
+      //Use string here instead of the constant as constant may not be available
+      //on the classpath of a hadoop 2.7
+      boolean isolatedClassloader =
+          conf.getBoolean("ozone.fs.isolated-classloader", defaultValue);
+
+      ConfigurationSource source;
+      if (conf instanceof OzoneConfiguration) {
+        source = (ConfigurationSource) conf;
+      } else {
+        source = new LegacyHadoopConfigurationSource(conf);
+      }
+      this.adapter =
+          createAdapter(source,
+              omHostOrServiceId, omPort,
+              isolatedClassloader);
+      this.adapterImpl = (BasicRootedOzoneClientAdapterImpl) this.adapter;
+
+      try {
+        this.userName =
+            UserGroupInformation.getCurrentUser().getShortUserName();
+      } catch (IOException e) {
+        this.userName = OZONE_DEFAULT_USER;
+      }
+      this.workingDir = new Path(OZONE_USER_DIR, this.userName)
+          .makeQualified(this.uri, this.workingDir);
+    } catch (URISyntaxException ue) {
+      final String msg = "Invalid Ozone endpoint " + name;
+      LOG.error(msg, ue);
+      throw new IOException(msg, ue);
+    }
+  }
+
+  protected OzoneClientAdapter createAdapter(ConfigurationSource conf,
+      String omHost, int omPort, boolean isolatedClassloader)
+      throws IOException {
+
+    if (isolatedClassloader) {
+      return OzoneClientAdapterFactory.createAdapter();
+    } else {
+      return new BasicRootedOzoneClientAdapterImpl(omHost, omPort, conf);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      adapter.close();
+    } finally {
+      super.close();
+    }
+  }
+
+  @Override
+  public URI getUri() {
+    return uri;
+  }
+
+  @Override
+  public String getScheme() {
+    return OZONE_OFS_URI_SCHEME;
+  }
+
+  @Override
+  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+    incrementCounter(Statistic.INVOCATION_OPEN);
+    statistics.incrementReadOps(1);
+    LOG.trace("open() path: {}", path);
+    final String key = pathToKey(path);
+    return new FSDataInputStream(
+        new OzoneFSInputStream(adapter.readFile(key), statistics));
+  }
+
+  protected void incrementCounter(Statistic statistic) {
+    //don't do anything in this default implementation.
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission,
+      boolean overwrite, int bufferSize,
+      short replication, long blockSize,
+      Progressable progress) throws IOException {
+    LOG.trace("create() path:{}", f);
+    incrementCounter(Statistic.INVOCATION_CREATE);
+    statistics.incrementWriteOps(1);
+    final String key = pathToKey(f);
+    return createOutputStream(key, replication, overwrite, true);
+  }
+
+  @Override
+  public FSDataOutputStream createNonRecursive(Path path,
+      FsPermission permission,
+      EnumSet<CreateFlag> flags,
+      int bufferSize,
+      short replication,
+      long blockSize,
+      Progressable progress) throws IOException {
+    incrementCounter(Statistic.INVOCATION_CREATE_NON_RECURSIVE);
+    statistics.incrementWriteOps(1);
+    final String key = pathToKey(path);
+    return createOutputStream(key,
+        replication, flags.contains(CreateFlag.OVERWRITE), false);
+  }
+
+  private FSDataOutputStream createOutputStream(String key, short replication,
+      boolean overwrite, boolean recursive) throws IOException {
+    return new FSDataOutputStream(adapter.createFile(key,
+        replication, overwrite, recursive), statistics);
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    throw new UnsupportedOperationException("append() Not implemented by the "
+        + getClass().getSimpleName() + " FileSystem implementation");
+  }
+
+  private class RenameIterator extends OzoneListingIterator {
+    private final String srcPath;
+    private final String dstPath;
+    private final OzoneBucket bucket;
+    private final BasicRootedOzoneClientAdapterImpl adapterImpl;
+
+    RenameIterator(Path srcPath, Path dstPath)
+        throws IOException {
+      super(srcPath);
+      this.srcPath = pathToKey(srcPath);
+      this.dstPath = pathToKey(dstPath);
+      LOG.trace("rename from:{} to:{}", this.srcPath, this.dstPath);
+      // Initialize bucket here to reduce number of RPC calls
+      OFSPath ofsPath = new OFSPath(srcPath);
+      // TODO: Refactor later.
+      adapterImpl = (BasicRootedOzoneClientAdapterImpl) adapter;
+      this.bucket = adapterImpl.getBucket(ofsPath, false);
+    }
+
+    @Override
+    boolean processKeyPath(String keyPath) throws IOException {
+      String newPath = dstPath.concat(keyPath.substring(srcPath.length()));
+      adapterImpl.rename(this.bucket, keyPath, newPath);
+      return true;
+    }
+  }
+
+  /**
+   * Check whether the source and destination path are valid and then perform
+   * rename from source path to destination path.
+   * <p>
+   * The rename operation is performed by renaming the keys with src as prefix.
+   * For such keys the prefix is changed from src to dst.
+   *
+   * @param src source path for rename
+   * @param dst destination path for rename
+   * @return true if rename operation succeeded or
+   * if the src and dst have the same path and are of the same type
+   * @throws IOException on I/O errors or if the src/dst paths are invalid.
+   */
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    incrementCounter(Statistic.INVOCATION_RENAME);
+    statistics.incrementWriteOps(1);
+    if (src.equals(dst)) {
+      return true;
+    }
+
+    LOG.trace("rename() from: {} to: {}", src, dst);
+    if (src.isRoot()) {
+      // Cannot rename root of file system
+      LOG.trace("Cannot rename the root of a filesystem");
+      return false;
+    }
+
+    // src and dst should be in the same bucket
+    OFSPath ofsSrc = new OFSPath(src);
+    OFSPath ofsDst = new OFSPath(dst);
+    if (!ofsSrc.isInSameBucketAs(ofsDst)) {
+      throw new IOException("Cannot rename a key to a different bucket");
+    }
+
+    // Cannot rename a directory to its own subdirectory
+    Path dstParent = dst.getParent();
+    while (dstParent != null && !src.equals(dstParent)) {
+      dstParent = dstParent.getParent();
+    }
+    Preconditions.checkArgument(dstParent == null,
+        "Cannot rename a directory to its own subdirectory");
+    // Check if the source exists
+    FileStatus srcStatus;
+    try {
+      srcStatus = getFileStatus(src);
+    } catch (FileNotFoundException fnfe) {
+      // source doesn't exist, return
+      return false;
+    }
+
+    // Check if the destination exists
+    FileStatus dstStatus;
+    try {
+      dstStatus = getFileStatus(dst);
+    } catch (FileNotFoundException fnde) {
+      dstStatus = null;
+    }
+
+    if (dstStatus == null) {
+      // If dst doesn't exist, check whether dst parent dir exists or not
+      // if the parent exists, the source can still be renamed to dst path
+      dstStatus = getFileStatus(dst.getParent());
+      if (!dstStatus.isDirectory()) {
+        throw new IOException(String.format(
+            "Failed to rename %s to %s, %s is a file", src, dst,
+            dst.getParent()));
+      }
+    } else {
+      // if dst exists and source and destination are same,
+      // check both the src and dst are of same type
+      if (srcStatus.getPath().equals(dstStatus.getPath())) {
+        return !srcStatus.isDirectory();
+      } else if (dstStatus.isDirectory()) {
+        // If dst is a directory, rename source as subpath of it.
+        // for example rename /source to /dst will lead to /dst/source
+        dst = new Path(dst, src.getName());
+        FileStatus[] statuses;
+        try {
+          statuses = listStatus(dst);
+        } catch (FileNotFoundException fnde) {
+          statuses = null;
+        }
+
+        if (statuses != null && statuses.length > 0) {
+          // If dst exists and not a directory not empty
+          throw new FileAlreadyExistsException(String.format(
+              "Failed to rename %s to %s, file already exists or not empty!",
+              src, dst));
+        }
+      } else {
+        // If dst is not a directory
+        throw new FileAlreadyExistsException(String.format(
+            "Failed to rename %s to %s, file already exists!", src, dst));
+      }
+    }
+
+    if (srcStatus.isDirectory()) {
+      if (dst.toString().startsWith(src.toString() + OZONE_URI_DELIMITER)) {
+        LOG.trace("Cannot rename a directory to a subdirectory of self");
+        return false;
+      }
+    }
+    RenameIterator iterator = new RenameIterator(src, dst);
+    boolean result = iterator.iterate();
+    if (result) {
+      createFakeParentDirectory(src);
+    }
+    return result;
+  }
+
+  private class DeleteIterator extends OzoneListingIterator {
+    final private boolean recursive;
+    private final OzoneBucket bucket;
+    private final BasicRootedOzoneClientAdapterImpl adapterImpl;
+
+    DeleteIterator(Path f, boolean recursive)
+        throws IOException {
+      super(f);
+      this.recursive = recursive;
+      if (getStatus().isDirectory()
+          && !this.recursive
+          && listStatus(f).length != 0) {
+        throw new PathIsNotEmptyDirectoryException(f.toString());
+      }
+      // Initialize bucket here to reduce number of RPC calls
+      OFSPath ofsPath = new OFSPath(f);
+      // TODO: Refactor later.
+      adapterImpl = (BasicRootedOzoneClientAdapterImpl) adapter;
+      this.bucket = adapterImpl.getBucket(ofsPath, false);
+    }
+
+    @Override
+    boolean processKeyPath(String keyPath) {
+      if (keyPath.equals("")) {
+        LOG.trace("Skipping deleting root directory");
+        return true;
+      } else {
+        LOG.trace("Deleting: {}", keyPath);
+        boolean succeed = adapterImpl.deleteObject(this.bucket, keyPath);
+        // if recursive delete is requested ignore the return value of
+        // deleteObject and issue deletes for other keys.
+        return recursive || succeed;
+      }
+    }
+  }
+
+  /**
+   * Deletes the children of the input dir path by iterating though the
+   * DeleteIterator.
+   *
+   * @param f directory path to be deleted
+   * @return true if successfully deletes all required keys, false otherwise
+   * @throws IOException
+   */
+  private boolean innerDelete(Path f, boolean recursive) throws IOException {
+    LOG.trace("delete() path:{} recursive:{}", f, recursive);
+    try {
+      DeleteIterator iterator = new DeleteIterator(f, recursive);
+      return iterator.iterate();
+    } catch (FileNotFoundException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Couldn't delete {} - does not exist", f);
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    incrementCounter(Statistic.INVOCATION_DELETE);
+    statistics.incrementWriteOps(1);
+    LOG.debug("Delete path {} - recursive {}", f, recursive);
+    FileStatus status;
+    try {
+      status = getFileStatus(f);
+    } catch (FileNotFoundException ex) {
+      LOG.warn("delete: Path does not exist: {}", f);
+      return false;
+    }
+
+    if (status == null) {
+      return false;
+    }
+
+    String key = pathToKey(f);
+    boolean result;
+
+    if (status.isDirectory()) {
+      LOG.debug("delete: Path is a directory: {}", f);
+      OFSPath ofsPath = new OFSPath(key);
+
+      // Handle rm root
+      if (ofsPath.isRoot()) {
+        // Intentionally drop support for rm root
+        // because it is too dangerous and doesn't provide much value
+        LOG.warn("delete: OFS does not support rm root. "
+            + "To wipe the cluster, please re-init OM instead.");
+        return false;
+      }
+
+      // Handle delete volume
+      if (ofsPath.isVolume()) {
+        String volumeName = ofsPath.getVolumeName();
+        if (recursive) {
+          // Delete all buckets first
+          OzoneVolume volume =
+              adapterImpl.getObjectStore().getVolume(volumeName);

Review comment:
       The main idea behind the adapter is that we can use only methods on the adapter which provides a clean definition of the used ozone method. I would the usage of getObjectStore() as it leaks the internal methods.
   
   The original goal of adapter to support different classloaders, but still seems to be a good design pattern to use an `adapter.getVolume` instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-issues-help@hadoop.apache.org