You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/09/17 11:07:47 UTC
[23/26] hadoop git commit: HDFS-10823. Implement
HttpFSFileSystem#listStatusIterator.
HDFS-10823. Implement HttpFSFileSystem#listStatusIterator.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8a409530
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8a409530
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8a409530
Branch: refs/heads/HADOOP-12756
Commit: 8a40953058d50d421d62b71067a13b626b3cba1f
Parents: f6f3a44
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Sep 16 15:37:36 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Sep 16 15:37:36 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/fs/FileSystem.java | 125 ++++++++++++++++---
.../apache/hadoop/fs/TestFilterFileSystem.java | 1 +
.../org/apache/hadoop/fs/TestHarFileSystem.java | 1 +
.../hadoop/hdfs/web/WebHdfsFileSystem.java | 68 ++++------
.../hadoop/fs/http/client/HttpFSFileSystem.java | 56 +++++++--
.../hadoop/fs/http/client/HttpFSUtils.java | 2 +
.../hadoop/fs/http/server/FSOperations.java | 62 +++++++++
.../http/server/HttpFSParametersProvider.java | 20 +++
.../hadoop/fs/http/server/HttpFSServer.java | 17 +++
.../service/hadoop/FileSystemAccessService.java | 4 +-
.../fs/http/client/BaseTestHttpFSWith.java | 62 ++++++++-
11 files changed, 340 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 9bde29d..5939f97 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.core.Tracer;
import org.apache.htrace.core.TraceScope;
+import com.google.common.base.Preconditions;
import com.google.common.annotations.VisibleForTesting;
import static com.google.common.base.Preconditions.checkArgument;
@@ -1530,7 +1531,68 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException;
-
+
+ /**
+ * Represents a batch of directory entries when iteratively listing a
+ * directory. This is a private API not meant for use by end users.
+ * <p>
+ * For internal use by FileSystem subclasses that override
+ * {@link FileSystem#listStatusBatch(Path, byte[])} to implement iterative
+ * listing.
+ */
+ @InterfaceAudience.Private
+ public static class DirectoryEntries {
+ private final FileStatus[] entries;
+ private final byte[] token;
+ private final boolean hasMore;
+
+ public DirectoryEntries(FileStatus[] entries, byte[] token, boolean
+ hasMore) {
+ this.entries = entries;
+ if (token != null) {
+ this.token = token.clone();
+ } else {
+ this.token = null;
+ }
+ this.hasMore = hasMore;
+ }
+
+ public FileStatus[] getEntries() {
+ return entries;
+ }
+
+ public byte[] getToken() {
+ return token;
+ }
+
+ public boolean hasMore() {
+ return hasMore;
+ }
+ }
+
+ /**
+ * Given an opaque iteration token, return the next batch of entries in a
+ * directory. This is a private API not meant for use by end users.
+ * <p>
+ * This method should be overridden by FileSystem subclasses that want to
+ * use the generic {@link FileSystem#listStatusIterator(Path)} implementation.
+ * @param f Path to list
+ * @param token opaque iteration token returned by previous call, or null
+ * if this is the first call.
+ * @return
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ @InterfaceAudience.Private
+ protected DirectoryEntries listStatusBatch(Path f, byte[] token) throws
+ FileNotFoundException, IOException {
+ // The default implementation returns the entire listing as a single batch.
+ // Thus, there is never a second batch, and no need to respect the passed
+ // token or set a token in the returned DirectoryEntries.
+ FileStatus[] listing = listStatus(f);
+ return new DirectoryEntries(listing, null, false);
+ }
+
/*
* Filter files/directories in the given path using the user-supplied path
* filter. Results are added to the given array <code>results</code>.
@@ -1767,6 +1829,49 @@ public abstract class FileSystem extends Configured implements Closeable {
}
/**
+ * Generic iterator for implementing {@link #listStatusIterator(Path)}.
+ */
+ private class DirListingIterator<T extends FileStatus> implements
+ RemoteIterator<T> {
+
+ private final Path path;
+ private DirectoryEntries entries;
+ private int i = 0;
+
+ DirListingIterator(Path path) {
+ this.path = path;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (entries == null) {
+ fetchMore();
+ }
+ return i < entries.getEntries().length ||
+ entries.hasMore();
+ }
+
+ private void fetchMore() throws IOException {
+ byte[] token = null;
+ if (entries != null) {
+ token = entries.getToken();
+ }
+ entries = listStatusBatch(path, token);
+ i = 0;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T next() throws IOException {
+ Preconditions.checkState(hasNext(), "No more items in iterator");
+ if (i == entries.getEntries().length) {
+ fetchMore();
+ }
+ return (T)entries.getEntries()[i++];
+ }
+ }
+
+ /**
* Returns a remote iterator so that followup calls are made on demand
* while consuming the entries. Each file system implementation should
* override this method and provide a more efficient implementation, if
@@ -1779,23 +1884,7 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
public RemoteIterator<FileStatus> listStatusIterator(final Path p)
throws FileNotFoundException, IOException {
- return new RemoteIterator<FileStatus>() {
- private final FileStatus[] stats = listStatus(p);
- private int i = 0;
-
- @Override
- public boolean hasNext() {
- return i<stats.length;
- }
-
- @Override
- public FileStatus next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException("No more entry in " + p);
- }
- return stats[i++];
- }
- };
+ return new DirListingIterator<>(p);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
index e1312bc..24f3dc8 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
@@ -103,6 +103,7 @@ public class TestFilterFileSystem {
public void processDeleteOnExit();
public FsStatus getStatus();
public FileStatus[] listStatus(Path f, PathFilter filter);
+ public FileStatus[] listStatusBatch(Path f, byte[] token);
public FileStatus[] listStatus(Path[] files);
public FileStatus[] listStatus(Path[] files, PathFilter filter);
public FileStatus[] globStatus(Path pathPattern);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
index d2020b9..bacdbb7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
@@ -115,6 +115,7 @@ public class TestHarFileSystem {
public QuotaUsage getQuotaUsage(Path f);
public FsStatus getStatus();
public FileStatus[] listStatus(Path f, PathFilter filter);
+ public FileStatus[] listStatusBatch(Path f, byte[] token);
public FileStatus[] listStatus(Path[] files);
public FileStatus[] listStatus(Path[] files, PathFilter filter);
public FileStatus[] globStatus(Path pathPattern);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index 9a9edc8..19de5b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
@@ -1504,55 +1503,30 @@ public class WebHdfsFileSystem extends FileSystem
}
private static final byte[] EMPTY_ARRAY = new byte[] {};
- private class DirListingIterator<T extends FileStatus> implements
- RemoteIterator<T> {
- private final Path path;
- private DirectoryListing thisListing;
- private int i = 0;
- private byte[] prevKey = EMPTY_ARRAY;
-
- DirListingIterator(Path path) {
- this.path = path;
- }
-
- @Override
- public boolean hasNext() throws IOException {
- if (thisListing == null) {
- fetchMore();
- }
- return i < thisListing.getPartialListing().length ||
- thisListing.hasMore();
- }
-
- private void fetchMore() throws IOException {
- thisListing = new FsPathResponseRunner<DirectoryListing>(
- GetOpParam.Op.LISTSTATUS_BATCH,
- path, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) {
- @Override
- DirectoryListing decodeResponse(Map<?, ?> json) throws IOException {
- return JsonUtilClient.toDirectoryListing(json);
- }
- }.run();
- i = 0;
- prevKey = thisListing.getLastName();
+ @Override
+ public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
+ FileNotFoundException, IOException {
+ byte[] prevKey = EMPTY_ARRAY;
+ if (token != null) {
+ prevKey = token;
}
-
- @Override
- @SuppressWarnings("unchecked")
- public T next() throws IOException {
- Preconditions.checkState(hasNext(), "No more items in iterator");
- if (i == thisListing.getPartialListing().length) {
- fetchMore();
+ DirectoryListing listing = new FsPathResponseRunner<DirectoryListing>(
+ GetOpParam.Op.LISTSTATUS_BATCH,
+ f, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) {
+ @Override
+ DirectoryListing decodeResponse(Map<?, ?> json) throws IOException {
+ return JsonUtilClient.toDirectoryListing(json);
}
- return (T)makeQualified(thisListing.getPartialListing()[i++], path);
- }
- }
-
- @Override
- public RemoteIterator<FileStatus> listStatusIterator(final Path f)
- throws FileNotFoundException, IOException {
- return new DirListingIterator<>(f);
+ }.run();
+ // Qualify the returned FileStatus array
+ final HdfsFileStatus[] statuses = listing.getPartialListing();
+ FileStatus[] qualified = new FileStatus[statuses.length];
+ for (int i = 0; i < statuses.length; i++) {
+ qualified[i] = makeQualified(statuses[i], f);
+ }
+ return new DirectoryEntries(qualified, listing.getLastName(),
+ listing.hasMore());
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
index 0f97d90..77e3323 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.fs.http.client;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
+
+import com.google.common.base.Charsets;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
@@ -111,6 +113,7 @@ public class HttpFSFileSystem extends FileSystem
public static final String XATTR_SET_FLAG_PARAM = "flag";
public static final String XATTR_ENCODING_PARAM = "encoding";
public static final String NEW_LENGTH_PARAM = "newlength";
+ public static final String START_AFTER_PARAM = "startAfter";
public static final Short DEFAULT_PERMISSION = 0755;
public static final String ACLSPEC_DEFAULT = "";
@@ -184,6 +187,10 @@ public class HttpFSFileSystem extends FileSystem
public static final String ENC_BIT_JSON = "encBit";
+ public static final String DIRECTORY_LISTING_JSON = "DirectoryListing";
+ public static final String PARTIAL_LISTING_JSON = "partialListing";
+ public static final String REMAINING_ENTRIES_JSON = "remainingEntries";
+
public static final int HTTP_TEMPORARY_REDIRECT = 307;
private static final String HTTP_GET = "GET";
@@ -203,7 +210,7 @@ public class HttpFSFileSystem extends FileSystem
MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT),
REMOVEDEFAULTACL(HTTP_PUT), REMOVEACL(HTTP_PUT), SETACL(HTTP_PUT),
DELETE(HTTP_DELETE), SETXATTR(HTTP_PUT), GETXATTRS(HTTP_GET),
- REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET);
+ REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET), LISTSTATUS_BATCH(HTTP_GET);
private String httpMethod;
@@ -666,6 +673,17 @@ public class HttpFSFileSystem extends FileSystem
return (Boolean) json.get(DELETE_JSON);
}
+ private FileStatus[] toFileStatuses(JSONObject json, Path f) {
+ json = (JSONObject) json.get(FILE_STATUSES_JSON);
+ JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON);
+ FileStatus[] array = new FileStatus[jsonArray.size()];
+ f = makeQualified(f);
+ for (int i = 0; i < jsonArray.size(); i++) {
+ array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i));
+ }
+ return array;
+ }
+
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
@@ -684,14 +702,36 @@ public class HttpFSFileSystem extends FileSystem
params, f, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
- json = (JSONObject) json.get(FILE_STATUSES_JSON);
- JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON);
- FileStatus[] array = new FileStatus[jsonArray.size()];
- f = makeQualified(f);
- for (int i = 0; i < jsonArray.size(); i++) {
- array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i));
+ return toFileStatuses(json, f);
+ }
+
+ @Override
+ public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
+ FileNotFoundException, IOException {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(OP_PARAM, Operation.LISTSTATUS_BATCH.toString());
+ if (token != null) {
+ params.put(START_AFTER_PARAM, new String(token, Charsets.UTF_8));
}
- return array;
+ HttpURLConnection conn = getConnection(
+ Operation.LISTSTATUS_BATCH.getMethod(),
+ params, f, true);
+ HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
+ // Parse the FileStatus array
+ JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
+ JSONObject listing = (JSONObject) json.get(DIRECTORY_LISTING_JSON);
+ FileStatus[] statuses = toFileStatuses(
+ (JSONObject) listing.get(PARTIAL_LISTING_JSON), f);
+ // New token is the last FileStatus entry
+ byte[] newToken = null;
+ if (statuses.length > 0) {
+ newToken = statuses[statuses.length - 1].getPath().getName().toString()
+ .getBytes(Charsets.UTF_8);
+ }
+ // Parse the remainingEntries boolean into hasMore
+ final long remainingEntries = (Long) listing.get(REMAINING_ENTRIES_JSON);
+ final boolean hasMore = remainingEntries > 0 ? true : false;
+ return new DirectoryEntries(statuses, newToken, hasMore);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java
index 95e26d7..fcc7bab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java
@@ -43,6 +43,8 @@ public class HttpFSUtils {
public static final String SERVICE_VERSION = "/v1";
+ public static final byte[] EMPTY_BYTES = {};
+
private static final String SERVICE_PATH = SERVICE_NAME + SERVICE_VERSION;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java
index 39597eb..46948f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.GlobFilter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@@ -37,6 +38,7 @@ import org.apache.hadoop.util.StringUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -108,6 +110,27 @@ public class FSOperations {
return json;
}
+ /**
+ * Serializes a DirectoryEntries object into the JSON for a
+ * WebHDFS {@link org.apache.hadoop.hdfs.protocol.DirectoryListing}.
+ * <p>
+ * These two classes are slightly different, due to the impedance
+ * mismatches between the WebHDFS and FileSystem APIs.
+ * @param entries
+ * @return json
+ */
+ private static Map<String, Object> toJson(FileSystem.DirectoryEntries
+ entries) {
+ Map<String, Object> json = new LinkedHashMap<>();
+ Map<String, Object> inner = new LinkedHashMap<>();
+ Map<String, Object> fileStatuses = toJson(entries.getEntries());
+ inner.put(HttpFSFileSystem.PARTIAL_LISTING_JSON, fileStatuses);
+ inner.put(HttpFSFileSystem.REMAINING_ENTRIES_JSON, entries.hasMore() ? 1
+ : 0);
+ json.put(HttpFSFileSystem.DIRECTORY_LISTING_JSON, inner);
+ return json;
+ }
+
/** Converts an <code>AclStatus</code> object into a JSON object.
*
* @param aclStatus AclStatus object
@@ -625,6 +648,45 @@ public class FSOperations {
}
/**
+ * Executor that performs a batched directory listing.
+ */
+ @InterfaceAudience.Private
+ public static class FSListStatusBatch implements FileSystemAccess
+ .FileSystemExecutor<Map> {
+ private final Path path;
+ private final byte[] token;
+
+ public FSListStatusBatch(String path, byte[] token) throws IOException {
+ this.path = new Path(path);
+ this.token = token.clone();
+ }
+
+ /**
+ * Simple wrapper filesystem that exposes the protected batched
+ * listStatus API so we can use it.
+ */
+ private static class WrappedFileSystem extends FilterFileSystem {
+ public WrappedFileSystem(FileSystem f) {
+ super(f);
+ }
+
+ @Override
+ public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
+ FileNotFoundException, IOException {
+ return super.listStatusBatch(f, token);
+ }
+ }
+
+ @Override
+ public Map execute(FileSystem fs) throws IOException {
+ WrappedFileSystem wrappedFS = new WrappedFileSystem(fs);
+ FileSystem.DirectoryEntries entries =
+ wrappedFS.listStatusBatch(path, token);
+ return toJson(entries);
+ }
+ }
+
+ /**
* Executor that performs a mkdirs FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java
index 5c4204a..25585c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java
@@ -91,6 +91,8 @@ public class HttpFSParametersProvider extends ParametersProvider {
PARAMS_DEF.put(Operation.GETXATTRS,
new Class[]{XAttrNameParam.class, XAttrEncodingParam.class});
PARAMS_DEF.put(Operation.LISTXATTRS, new Class[]{});
+ PARAMS_DEF.put(Operation.LISTSTATUS_BATCH,
+ new Class[]{StartAfterParam.class});
}
public HttpFSParametersProvider() {
@@ -520,4 +522,22 @@ public class HttpFSParametersProvider extends ParametersProvider {
super(NAME, XAttrCodec.class, null);
}
}
+
+ /**
+ * Class for startafter parameter.
+ */
+ @InterfaceAudience.Private
+ public static class StartAfterParam extends StringParam {
+ /**
+ * Parameter name.
+ */
+ public static final String NAME = HttpFSFileSystem.START_AFTER_PARAM;
+
+ /**
+ * Constructor.
+ */
+ public StartAfterParam() {
+ super(NAME, null);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java
index db4692a..a4db124 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.fs.http.server;
+import com.google.common.base.Charsets;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
+import org.apache.hadoop.fs.http.client.HttpFSUtils;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AccessTimeParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AclPermissionParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.BlockSizeParam;
@@ -320,6 +322,21 @@ public class HttpFSServer {
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
+ case LISTSTATUS_BATCH: {
+ String startAfter = params.get(
+ HttpFSParametersProvider.StartAfterParam.NAME,
+ HttpFSParametersProvider.StartAfterParam.class);
+ byte[] token = HttpFSUtils.EMPTY_BYTES;
+ if (startAfter != null) {
+ token = startAfter.getBytes(Charsets.UTF_8);
+ }
+ FSOperations.FSListStatusBatch command = new FSOperations
+ .FSListStatusBatch(path, token);
+ @SuppressWarnings("rawtypes") Map json = fsExecute(user, command);
+ AUDIT_LOG.info("[{}] token [{}]", path, token);
+ response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
+ break;
+ }
default: {
throw new IOException(
MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value()));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java
index 88780cb..0b767be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java
@@ -84,7 +84,7 @@ public class FileSystemAccessService extends BaseService implements FileSystemAc
count = 0;
}
- synchronized FileSystem getFileSytem(Configuration conf)
+ synchronized FileSystem getFileSystem(Configuration conf)
throws IOException {
if (fs == null) {
fs = FileSystem.get(conf);
@@ -290,7 +290,7 @@ public class FileSystemAccessService extends BaseService implements FileSystemAc
}
Configuration conf = new Configuration(namenodeConf);
conf.set(HTTPFS_FS_USER, user);
- return cachedFS.getFileSytem(conf);
+ return cachedFS.getFileSystem(conf);
}
protected void closeFileSystem(FileSystem fs) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a409530/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java
index aea6cf8..e475803 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
@@ -44,6 +45,7 @@ import org.apache.hadoop.test.TestHdfsHelper;
import org.apache.hadoop.test.TestJetty;
import org.apache.hadoop.test.TestJettyHelper;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -62,6 +64,7 @@ import java.io.Writer;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -136,14 +139,19 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
return "webhdfs";
}
- protected FileSystem getHttpFSFileSystem() throws Exception {
- Configuration conf = new Configuration();
+ protected FileSystem getHttpFSFileSystem(Configuration conf) throws
+ Exception {
conf.set("fs.webhdfs.impl", getFileSystemClass().getName());
URI uri = new URI(getScheme() + "://" +
TestJettyHelper.getJettyURL().toURI().getAuthority());
return FileSystem.get(uri, conf);
}
+ protected FileSystem getHttpFSFileSystem() throws Exception {
+ Configuration conf = new Configuration();
+ return getHttpFSFileSystem(conf);
+ }
+
protected void testGet() throws Exception {
FileSystem fs = getHttpFSFileSystem();
Assert.assertNotNull(fs);
@@ -355,6 +363,51 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
assertEquals(stati[0].getPath().getName(), path.getName());
}
+ private static void assertSameListing(FileSystem expected, FileSystem
+ actual, Path p) throws IOException {
+ // Consume all the entries from both iterators
+ RemoteIterator<FileStatus> exIt = expected.listStatusIterator(p);
+ List<FileStatus> exStatuses = new ArrayList<>();
+ while (exIt.hasNext()) {
+ exStatuses.add(exIt.next());
+ }
+ RemoteIterator<FileStatus> acIt = actual.listStatusIterator(p);
+ List<FileStatus> acStatuses = new ArrayList<>();
+ while (acIt.hasNext()) {
+ acStatuses.add(acIt.next());
+ }
+ assertEquals(exStatuses.size(), acStatuses.size());
+ for (int i = 0; i < exStatuses.size(); i++) {
+ FileStatus expectedStatus = exStatuses.get(i);
+ FileStatus actualStatus = acStatuses.get(i);
+ // Path URIs are fully qualified, so compare just the path component
+ assertEquals(expectedStatus.getPath().toUri().getPath(),
+ actualStatus.getPath().toUri().getPath());
+ }
+ }
+
+ private void testListStatusBatch() throws Exception {
+ // LocalFileSystem writes checksum files next to the data files, which
+ // show up when listing via LFS. This makes the listings not compare
+ // properly.
+ Assume.assumeFalse(isLocalFS());
+
+ FileSystem proxyFs = FileSystem.get(getProxiedFSConf());
+ Configuration conf = new Configuration();
+ conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
+ FileSystem httpFs = getHttpFSFileSystem(conf);
+
+ // Test an empty directory
+ Path dir = new Path(getProxiedFSTestDir(), "dir");
+ proxyFs.mkdirs(dir);
+ assertSameListing(proxyFs, httpFs, dir);
+ // Create and test in a loop
+ for (int i = 0; i < 10; i++) {
+ proxyFs.create(new Path(dir, "file" + i)).close();
+ assertSameListing(proxyFs, httpFs, dir);
+ }
+ }
+
private void testWorkingdirectory() throws Exception {
FileSystem fs = FileSystem.get(getProxiedFSConf());
Path workingDir = fs.getWorkingDirectory();
@@ -863,7 +916,7 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS,
WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER,
SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR,
- GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION
+ GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, LIST_STATUS_BATCH
}
private void operation(Operation op) throws Exception {
@@ -940,6 +993,9 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
case ENCRYPTION:
testEncryption();
break;
+ case LIST_STATUS_BATCH:
+ testListStatusBatch();
+ break;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org