You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by xy...@apache.org on 2020/02/28 19:03:48 UTC
[hadoop-ozone] branch HDDS-2665-ofs updated: HDDS-3073. Implement
ofs://: Fix listStatus continuation (#606)
This is an automated email from the ASF dual-hosted git repository.
xyao pushed a commit to branch HDDS-2665-ofs
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-2665-ofs by this push:
new 0610ce3 HDDS-3073. Implement ofs://: Fix listStatus continuation (#606)
0610ce3 is described below
commit 0610ce39c51a7eba728be418f3d6cd425cafe2ad
Author: Siyao Meng <50...@users.noreply.github.com>
AuthorDate: Fri Feb 28 11:03:37 2020 -0800
HDDS-3073. Implement ofs://: Fix listStatus continuation (#606)
---
.../hadoop/fs/ozone/TestRootedOzoneFileSystem.java | 93 +++++++++++++++++++---
.../ozone/BasicRootedOzoneClientAdapterImpl.java | 28 +++++--
.../java/org/apache/hadoop/fs/ozone/OFSPath.java | 16 ++++
.../org/apache/hadoop/fs/ozone/TestOFSPath.java | 11 +++
4 files changed, 131 insertions(+), 17 deletions(-)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
index 033efdb..a881f10 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.ozone;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,6 +46,7 @@ import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@@ -67,6 +69,7 @@ public class TestRootedOzoneFileSystem {
private static FileSystem fs;
private static RootedOzoneFileSystem ofs;
private static ObjectStore objectStore;
+ private static BasicRootedOzoneClientAdapterImpl adapter;
private String volumeName;
private String bucketName;
@@ -101,6 +104,7 @@ public class TestRootedOzoneFileSystem {
conf.set("fs.ofs.impl", "org.apache.hadoop.fs.ozone.RootedOzoneFileSystem");
fs = FileSystem.get(conf);
ofs = (RootedOzoneFileSystem) fs;
+ adapter = (BasicRootedOzoneClientAdapterImpl) ofs.getAdapter();
}
@After
@@ -501,6 +505,9 @@ public class TestRootedOzoneFileSystem {
/**
* Helper function for testListStatusRootAndVolume*.
+ * Each call creates one volume, one bucket under that volume,
+ * two dir under that bucket, one subdir under one of the dirs,
+ * and one file under the subdir.
*/
private Path createRandomVolumeBucketWithDirs() throws IOException {
String volume1 = getRandomNonExistVolumeName();
@@ -515,6 +522,11 @@ public class TestRootedOzoneFileSystem {
Path dir2 = new Path(bucketPath1, "dir2");
fs.mkdirs(dir2);
+ try (FSDataOutputStream stream =
+ ofs.create(new Path(dir2, "file1"))) {
+ stream.write(1);
+ }
+
return bucketPath1;
}
@@ -556,14 +568,11 @@ public class TestRootedOzoneFileSystem {
}
/**
- * Helper function to call adapter impl for listStatus (with recursive).
+ * Helper function to call listStatus in adapter implementation.
*/
- private List<FileStatus> listStatusCallAdapterHelper(String pathStr)
- throws IOException {
- // FileSystem interface does not support recursive listStatus, use adapter
- BasicRootedOzoneClientAdapterImpl adapter =
- (BasicRootedOzoneClientAdapterImpl) ofs.getAdapter();
- return adapter.listStatus(pathStr, true, "", 1000,
+ private List<FileStatus> callAdapterListStatus(String pathStr,
+ boolean recursive, String startPath, long numEntries) throws IOException {
+ return adapter.listStatus(pathStr, recursive, startPath, numEntries,
ofs.getUri(), ofs.getWorkingDirectory(), ofs.getUsername())
.stream().map(ofs::convertFileStatus).collect(Collectors.toList());
}
@@ -574,8 +583,8 @@ public class TestRootedOzoneFileSystem {
*/
private void listStatusCheckHelper(Path path) throws IOException {
// Get recursive listStatus result directly from adapter impl
- List<FileStatus> statusesFromAdapter =
- listStatusCallAdapterHelper(path.toString());
+ List<FileStatus> statusesFromAdapter = callAdapterListStatus(
+ path.toString(), true, "", 1000);
// Get recursive listStatus result with FileSystem API by simulating FsShell
List<FileStatus> statusesFromFS = new ArrayList<>();
listStatusRecursiveHelper(path, statusesFromFS);
@@ -602,7 +611,7 @@ public class TestRootedOzoneFileSystem {
* OFS: Test recursive listStatus on root and volume.
*/
@Test
- public void testListStatusRootAndVolumeRecursive() throws Exception {
+ public void testListStatusRootAndVolumeRecursive() throws IOException {
Path bucketPath1 = createRandomVolumeBucketWithDirs();
createRandomVolumeBucketWithDirs();
// listStatus("/volume/bucket")
@@ -616,4 +625,68 @@ public class TestRootedOzoneFileSystem {
listStatusCheckHelper(root);
}
+ /**
+ * Helper function. FileSystem#listStatus on steroid:
+ * Supports recursion, start path and custom listing page size (numEntries).
+ * @param f Given path
+ * @param recursive List contents inside subdirectories
+ * @param startPath Starting path of the batch
+ * @param numEntries Max number of entries in result
+ * @return Array of the statuses of the files/directories in the given path
+ * @throws IOException See specific implementation
+ */
+ private FileStatus[] customListStatus(Path f, boolean recursive,
+ String startPath, int numEntries) throws IOException {
+ Assert.assertTrue(numEntries > 0);
+ LinkedList<FileStatus> statuses = new LinkedList<>();
+ List<FileStatus> tmpStatusList;
+ do {
+ tmpStatusList = callAdapterListStatus(f.toString(), recursive,
+ startPath, numEntries - statuses.size());
+ if (!tmpStatusList.isEmpty()) {
+ statuses.addAll(tmpStatusList);
+ startPath = statuses.getLast().getPath().toString();
+ }
+ } while (tmpStatusList.size() == numEntries &&
+ statuses.size() < numEntries);
+ return statuses.toArray(new FileStatus[0]);
+ }
+
+ @Test
+ public void testListStatusRootAndVolumeContinuation() throws IOException {
+ for (int i = 0; i < 5; i++) {
+ createRandomVolumeBucketWithDirs();
+ }
+ // Similar to recursive option, we can't test continuation directly with
+ // FileSystem because we can't change LISTING_PAGE_SIZE. Use adapter instead
+
+ // numEntries > 5
+ FileStatus[] fileStatusesOver = customListStatus(
+ new Path("/"), false, "", 8);
+ // There are only 5 volumes
+ Assert.assertEquals(5, fileStatusesOver.length);
+
+ // numEntries = 5
+ FileStatus[] fileStatusesExact = customListStatus(
+ new Path("/"), false, "", 5);
+ Assert.assertEquals(5, fileStatusesExact.length);
+
+ // numEntries < 5
+ FileStatus[] fileStatusesLimit1 = customListStatus(
+ new Path("/"), false, "", 3);
+ // Should only return 3 volumes even though there are more than that due to
+ // the specified limit
+ Assert.assertEquals(3, fileStatusesLimit1.length);
+
+ // Get the last entry in the list as startPath
+ String nextStartPath = fileStatusesLimit1[fileStatusesLimit1.length - 1]
+ .getPath().toString();
+ FileStatus[] fileStatusesLimit2 = customListStatus(
+ new Path("/"), false, nextStartPath, 3);
+ // Note: at the time of writing this test, OmMetadataManagerImpl#listVolumes
+ // excludes startVolume (startPath) from the result. Might change.
+ Assert.assertEquals(fileStatusesOver.length,
+ fileStatusesLimit1.length + fileStatusesLimit2.length);
+ }
+
}
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 69a350c..3baba25 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -21,6 +21,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -496,15 +497,13 @@ public class BasicRootedOzoneClientAdapterImpl
OFSPath ofsStartPath = new OFSPath(startPath);
// list volumes
Iterator<? extends OzoneVolume> iter = objectStore.listVolumesByUser(
- username, ofsStartPath.getVolumeName(), null);
+ username, null, ofsStartPath.getVolumeName());
List<FileStatusAdapter> res = new ArrayList<>();
- // TODO: Test continuation
- while (iter.hasNext() && res.size() <= numEntries) {
+ while (iter.hasNext() && res.size() < numEntries) {
OzoneVolume volume = iter.next();
res.add(getFileStatusAdapterForVolume(volume, uri));
if (recursive) {
String pathStrNextVolume = volume.getName();
- // TODO: Check startPath
res.addAll(listStatus(pathStrNextVolume, recursive, startPath,
numEntries - res.size(), uri, workingDir, username));
}
@@ -525,13 +524,11 @@ public class BasicRootedOzoneClientAdapterImpl
Iterator<? extends OzoneBucket> iter =
volume.listBuckets(null, ofsStartPath.getBucketName());
List<FileStatusAdapter> res = new ArrayList<>();
- // TODO: Test continuation
- while (iter.hasNext() && res.size() <= numEntries) {
+ while (iter.hasNext() && res.size() < numEntries) {
OzoneBucket bucket = iter.next();
res.add(getFileStatusAdapterForBucket(bucket, uri, username));
if (recursive) {
String pathStrNext = volumeStr + OZONE_URI_DELIMITER + bucket.getName();
- // TODO: Check startPath
res.addAll(listStatus(pathStrNext, recursive, startPath,
numEntries - res.size(), uri, workingDir, username));
}
@@ -548,6 +545,10 @@ public class BasicRootedOzoneClientAdapterImpl
* @param startPath Start path of next batch of result for continuation.
* This takes an absolute path from OFS root. e.g.
* /volumeA/bucketB/dirC/fileD
+ * Note startPath can optionally begin with uri, e.g.
+ * when uri=ofs://svc1
+ * startPath=ofs://svc1/volumeA/bucketB/dirC/fileD
+ * will be accepted, but NOT startPath=ofs://svc2/volumeA/...
* @param numEntries Number of maximum entries in the batch.
* @param uri URI of OFS root.
* Used in making the return path qualified.
@@ -563,6 +564,19 @@ public class BasicRootedOzoneClientAdapterImpl
Path workingDir, String username) throws IOException {
incrementCounter(Statistic.OBJECTS_LIST);
+ // Remove authority from startPath if it exists
+ if (startPath.startsWith(uri.toString())) {
+ try {
+ startPath = new URI(startPath).getPath();
+ } catch (URISyntaxException ex) {
+ throw new IOException(ex);
+ }
+ }
+ // Note: startPath could still have authority at this point if it's
+ // authority doesn't match uri. This is by design. In this case,
+ // OFSPath initializer will error out.
+ // The goal is to refuse processing startPaths from other authorities.
+
OFSPath ofsPath = new OFSPath(pathStr);
if (ofsPath.isRoot()) {
return listStatusRoot(
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OFSPath.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OFSPath.java
index 02b8e02..c9ae6a7 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OFSPath.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OFSPath.java
@@ -18,8 +18,12 @@
package org.apache.hadoop.fs.ozone;
import org.apache.hadoop.fs.Path;
+import org.apache.http.ParseException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
+
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.StringTokenizer;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
@@ -60,6 +64,18 @@ class OFSPath {
}
private void initOFSPath(String pathStr) {
+ // pathStr should not have authority
+ try {
+ URI uri = new URI(pathStr);
+ String authority = uri.getAuthority();
+ if (authority != null && !authority.isEmpty()) {
+ throw new ParseException("Invalid path " + pathStr +
+ ". Shouldn't contain authority.");
+ }
+ } catch (URISyntaxException ex) {
+ throw new ParseException("Failed to parse path " + pathStr + " as URI.");
+ }
+ // tokenize
StringTokenizer token = new StringTokenizer(pathStr, OZONE_URI_DELIMITER);
int numToken = token.countTokens();
if (numToken > 0) {
diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOFSPath.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOFSPath.java
index ed6a069..05e1ce2 100644
--- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOFSPath.java
+++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOFSPath.java
@@ -91,6 +91,17 @@ public class TestOFSPath {
}
@Test
+ public void testParsingWithAuthority() {
+ try {
+ new OFSPath("ofs://svc1/volume1/bucket1/dir1/");
+ Assert.fail(
+ "Should have thrown exception when parsing path with authority.");
+ } catch (Exception ignored) {
+ // Test pass
+ }
+ }
+
+ @Test
public void testParsingMount() {
// Mount only
OFSPath ofsPath = new OFSPath("/tmp/");
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org