You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by xy...@apache.org on 2020/02/28 19:03:48 UTC

[hadoop-ozone] branch HDDS-2665-ofs updated: HDDS-3073. Implement ofs://: Fix listStatus continuation (#606)

This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch HDDS-2665-ofs
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/HDDS-2665-ofs by this push:
     new 0610ce3  HDDS-3073. Implement ofs://: Fix listStatus continuation (#606)
0610ce3 is described below

commit 0610ce39c51a7eba728be418f3d6cd425cafe2ad
Author: Siyao Meng <50...@users.noreply.github.com>
AuthorDate: Fri Feb 28 11:03:37 2020 -0800

    HDDS-3073. Implement ofs://: Fix listStatus continuation (#606)
---
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java | 93 +++++++++++++++++++---
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   | 28 +++++--
 .../java/org/apache/hadoop/fs/ozone/OFSPath.java   | 16 ++++
 .../org/apache/hadoop/fs/ozone/TestOFSPath.java    | 11 +++
 4 files changed, 131 insertions(+), 17 deletions(-)

diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
index 033efdb..a881f10 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.ozone;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +46,7 @@ import org.junit.rules.Timeout;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -67,6 +69,7 @@ public class TestRootedOzoneFileSystem {
   private static FileSystem fs;
   private static RootedOzoneFileSystem ofs;
   private static ObjectStore objectStore;
+  private static BasicRootedOzoneClientAdapterImpl adapter;
 
   private String volumeName;
   private String bucketName;
@@ -101,6 +104,7 @@ public class TestRootedOzoneFileSystem {
     conf.set("fs.ofs.impl", "org.apache.hadoop.fs.ozone.RootedOzoneFileSystem");
     fs = FileSystem.get(conf);
     ofs = (RootedOzoneFileSystem) fs;
+    adapter = (BasicRootedOzoneClientAdapterImpl) ofs.getAdapter();
   }
 
   @After
@@ -501,6 +505,9 @@ public class TestRootedOzoneFileSystem {
 
   /**
    * Helper function for testListStatusRootAndVolume*.
+   * Each call creates one volume, one bucket under that volume,
+   * two dir under that bucket, one subdir under one of the dirs,
+   * and one file under the subdir.
    */
   private Path createRandomVolumeBucketWithDirs() throws IOException {
     String volume1 = getRandomNonExistVolumeName();
@@ -515,6 +522,11 @@ public class TestRootedOzoneFileSystem {
     Path dir2 = new Path(bucketPath1, "dir2");
     fs.mkdirs(dir2);
 
+    try (FSDataOutputStream stream =
+        ofs.create(new Path(dir2, "file1"))) {
+      stream.write(1);
+    }
+
     return bucketPath1;
   }
 
@@ -556,14 +568,11 @@ public class TestRootedOzoneFileSystem {
   }
 
   /**
-   * Helper function to call adapter impl for listStatus (with recursive).
+   * Helper function to call listStatus in adapter implementation.
    */
-  private List<FileStatus> listStatusCallAdapterHelper(String pathStr)
-      throws IOException {
-    // FileSystem interface does not support recursive listStatus, use adapter
-    BasicRootedOzoneClientAdapterImpl adapter =
-        (BasicRootedOzoneClientAdapterImpl) ofs.getAdapter();
-    return adapter.listStatus(pathStr, true, "", 1000,
+  private List<FileStatus> callAdapterListStatus(String pathStr,
+      boolean recursive, String startPath, long numEntries) throws IOException {
+    return adapter.listStatus(pathStr, recursive, startPath, numEntries,
         ofs.getUri(), ofs.getWorkingDirectory(), ofs.getUsername())
         .stream().map(ofs::convertFileStatus).collect(Collectors.toList());
   }
@@ -574,8 +583,8 @@ public class TestRootedOzoneFileSystem {
    */
   private void listStatusCheckHelper(Path path) throws IOException {
     // Get recursive listStatus result directly from adapter impl
-    List<FileStatus> statusesFromAdapter =
-        listStatusCallAdapterHelper(path.toString());
+    List<FileStatus> statusesFromAdapter = callAdapterListStatus(
+        path.toString(), true, "", 1000);
     // Get recursive listStatus result with FileSystem API by simulating FsShell
     List<FileStatus> statusesFromFS = new ArrayList<>();
     listStatusRecursiveHelper(path, statusesFromFS);
@@ -602,7 +611,7 @@ public class TestRootedOzoneFileSystem {
    * OFS: Test recursive listStatus on root and volume.
    */
   @Test
-  public void testListStatusRootAndVolumeRecursive() throws Exception {
+  public void testListStatusRootAndVolumeRecursive() throws IOException {
     Path bucketPath1 = createRandomVolumeBucketWithDirs();
     createRandomVolumeBucketWithDirs();
     // listStatus("/volume/bucket")
@@ -616,4 +625,68 @@ public class TestRootedOzoneFileSystem {
     listStatusCheckHelper(root);
   }
 
+  /**
+   * Helper function. FileSystem#listStatus on steroid:
+   * Supports recursion, start path and custom listing page size (numEntries).
+   * @param f Given path
+   * @param recursive List contents inside subdirectories
+   * @param startPath Starting path of the batch
+   * @param numEntries Max number of entries in result
+   * @return Array of the statuses of the files/directories in the given path
+   * @throws IOException See specific implementation
+   */
+  private FileStatus[] customListStatus(Path f, boolean recursive,
+      String startPath, int numEntries) throws IOException {
+    Assert.assertTrue(numEntries > 0);
+    LinkedList<FileStatus> statuses = new LinkedList<>();
+    List<FileStatus> tmpStatusList;
+    do {
+      tmpStatusList = callAdapterListStatus(f.toString(), recursive,
+          startPath, numEntries - statuses.size());
+      if (!tmpStatusList.isEmpty()) {
+        statuses.addAll(tmpStatusList);
+        startPath = statuses.getLast().getPath().toString();
+      }
+    } while (tmpStatusList.size() == numEntries &&
+        statuses.size() < numEntries);
+    return statuses.toArray(new FileStatus[0]);
+  }
+
+  @Test
+  public void testListStatusRootAndVolumeContinuation() throws IOException {
+    for (int i = 0; i < 5; i++) {
+      createRandomVolumeBucketWithDirs();
+    }
+    // Similar to recursive option, we can't test continuation directly with
+    // FileSystem because we can't change LISTING_PAGE_SIZE. Use adapter instead
+
+    // numEntries > 5
+    FileStatus[] fileStatusesOver = customListStatus(
+        new Path("/"), false, "", 8);
+    // There are only 5 volumes
+    Assert.assertEquals(5, fileStatusesOver.length);
+
+    // numEntries = 5
+    FileStatus[] fileStatusesExact = customListStatus(
+        new Path("/"), false, "", 5);
+    Assert.assertEquals(5, fileStatusesExact.length);
+
+    // numEntries < 5
+    FileStatus[] fileStatusesLimit1 = customListStatus(
+        new Path("/"), false, "", 3);
+    // Should only return 3 volumes even though there are more than that due to
+    // the specified limit
+    Assert.assertEquals(3, fileStatusesLimit1.length);
+
+    // Get the last entry in the list as startPath
+    String nextStartPath = fileStatusesLimit1[fileStatusesLimit1.length - 1]
+        .getPath().toString();
+    FileStatus[] fileStatusesLimit2 = customListStatus(
+        new Path("/"), false, nextStartPath, 3);
+    // Note: at the time of writing this test, OmMetadataManagerImpl#listVolumes
+    //  excludes startVolume (startPath) from the result. Might change.
+    Assert.assertEquals(fileStatusesOver.length,
+        fileStatusesLimit1.length + fileStatusesLimit2.length);
+  }
+
 }
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 69a350c..3baba25 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -21,6 +21,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -496,15 +497,13 @@ public class BasicRootedOzoneClientAdapterImpl
     OFSPath ofsStartPath = new OFSPath(startPath);
     // list volumes
     Iterator<? extends OzoneVolume> iter = objectStore.listVolumesByUser(
-        username, ofsStartPath.getVolumeName(), null);
+        username, null, ofsStartPath.getVolumeName());
     List<FileStatusAdapter> res = new ArrayList<>();
-    // TODO: Test continuation
-    while (iter.hasNext() && res.size() <= numEntries) {
+    while (iter.hasNext() && res.size() < numEntries) {
       OzoneVolume volume = iter.next();
       res.add(getFileStatusAdapterForVolume(volume, uri));
       if (recursive) {
         String pathStrNextVolume = volume.getName();
-        // TODO: Check startPath
         res.addAll(listStatus(pathStrNextVolume, recursive, startPath,
             numEntries - res.size(), uri, workingDir, username));
       }
@@ -525,13 +524,11 @@ public class BasicRootedOzoneClientAdapterImpl
     Iterator<? extends OzoneBucket> iter =
         volume.listBuckets(null, ofsStartPath.getBucketName());
     List<FileStatusAdapter> res = new ArrayList<>();
-    // TODO: Test continuation
-    while (iter.hasNext() && res.size() <= numEntries) {
+    while (iter.hasNext() && res.size() < numEntries) {
       OzoneBucket bucket = iter.next();
       res.add(getFileStatusAdapterForBucket(bucket, uri, username));
       if (recursive) {
         String pathStrNext = volumeStr + OZONE_URI_DELIMITER + bucket.getName();
-        // TODO: Check startPath
         res.addAll(listStatus(pathStrNext, recursive, startPath,
             numEntries - res.size(), uri, workingDir, username));
       }
@@ -548,6 +545,10 @@ public class BasicRootedOzoneClientAdapterImpl
    * @param startPath Start path of next batch of result for continuation.
    *                  This takes an absolute path from OFS root. e.g.
    *                  /volumeA/bucketB/dirC/fileD
+   *                  Note startPath can optionally begin with uri, e.g.
+   *                  when uri=ofs://svc1
+   *                  startPath=ofs://svc1/volumeA/bucketB/dirC/fileD
+   *                  will be accepted, but NOT startPath=ofs://svc2/volumeA/...
    * @param numEntries Number of maximum entries in the batch.
    * @param uri URI of OFS root.
    *            Used in making the return path qualified.
@@ -563,6 +564,19 @@ public class BasicRootedOzoneClientAdapterImpl
       Path workingDir, String username) throws IOException {
 
     incrementCounter(Statistic.OBJECTS_LIST);
+    // Remove authority from startPath if it exists
+    if (startPath.startsWith(uri.toString())) {
+      try {
+        startPath = new URI(startPath).getPath();
+      } catch (URISyntaxException ex) {
+        throw new IOException(ex);
+      }
+    }
+    // Note: startPath could still have authority at this point if it's
+    //  authority doesn't match uri. This is by design. In this case,
+    //  OFSPath initializer will error out.
+    //  The goal is to refuse processing startPaths from other authorities.
+
     OFSPath ofsPath = new OFSPath(pathStr);
     if (ofsPath.isRoot()) {
       return listStatusRoot(
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OFSPath.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OFSPath.java
index 02b8e02..c9ae6a7 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OFSPath.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OFSPath.java
@@ -18,8 +18,12 @@
 package org.apache.hadoop.fs.ozone;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.http.ParseException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
+
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.StringTokenizer;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
@@ -60,6 +64,18 @@ class OFSPath {
   }
 
   private void initOFSPath(String pathStr) {
+    // pathStr should not have authority
+    try {
+      URI uri = new URI(pathStr);
+      String authority = uri.getAuthority();
+      if (authority != null && !authority.isEmpty()) {
+        throw new ParseException("Invalid path " + pathStr +
+            ". Shouldn't contain authority.");
+      }
+    } catch (URISyntaxException ex) {
+      throw new ParseException("Failed to parse path " + pathStr + " as URI.");
+    }
+    // tokenize
     StringTokenizer token = new StringTokenizer(pathStr, OZONE_URI_DELIMITER);
     int numToken = token.countTokens();
     if (numToken > 0) {
diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOFSPath.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOFSPath.java
index ed6a069..05e1ce2 100644
--- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOFSPath.java
+++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOFSPath.java
@@ -91,6 +91,17 @@ public class TestOFSPath {
   }
 
   @Test
+  public void testParsingWithAuthority() {
+    try {
+      new OFSPath("ofs://svc1/volume1/bucket1/dir1/");
+      Assert.fail(
+          "Should have thrown exception when parsing path with authority.");
+    } catch (Exception ignored) {
+      // Test pass
+    }
+  }
+
+  @Test
   public void testParsingMount() {
     // Mount only
     OFSPath ofsPath = new OFSPath("/tmp/");


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org