You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2016/06/02 14:38:47 UTC
hadoop git commit: HDFS-10464: libhdfs++: Implement GetPathInfo and
ListDirectory. Contributed by Anatoli Shein.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-8707 43765269e -> f206a36c9
HDFS-10464: libhdfs++: Implement GetPathInfo and ListDirectory. Contributed by Anatoli Shein.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f206a36c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f206a36c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f206a36c
Branch: refs/heads/HDFS-8707
Commit: f206a36c9b1a2c4ca84cae129b89f4b94eb259ad
Parents: 4376526
Author: Bob Hansen <bo...@hp.com>
Authored: Thu Jun 2 10:38:05 2016 -0400
Committer: Bob Hansen <bo...@hp.com>
Committed: Thu Jun 2 10:38:05 2016 -0400
----------------------------------------------------------------------
.../libhdfs-tests/test_libhdfs_threaded.c | 54 +++-
.../native/libhdfspp/include/hdfspp/hdfspp.h | 30 ++-
.../native/libhdfspp/include/hdfspp/statinfo.h | 62 +++++
.../native/libhdfspp/include/hdfspp/status.h | 4 +-
.../native/libhdfspp/lib/bindings/c/hdfs.cc | 128 ++++++++++
.../main/native/libhdfspp/lib/common/status.cc | 10 +
.../main/native/libhdfspp/lib/fs/filesystem.cc | 245 +++++++++++++++++--
.../main/native/libhdfspp/lib/fs/filesystem.h | 29 +++
.../src/main/native/libhdfspp/tests/hdfs_shim.c | 11 +-
9 files changed, 543 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f206a36c/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c
index a922ff8..1e10159 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c
@@ -27,6 +27,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <limits.h>
#define TO_STR_HELPER(X) #X
#define TO_STR(X) TO_STR_HELPER(X)
@@ -53,7 +54,7 @@ static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs
tPort port;
hdfsFS hdfs;
struct hdfsBuilder *bld;
-
+
port = (tPort)nmdGetNameNodePort(cl);
if (port < 0) {
fprintf(stderr, "hdfsSingleNameNodeConnect: nmdGetNameNodePort "
@@ -106,7 +107,7 @@ static int doTestGetDefaultBlockSize(hdfsFS fs, const char *path)
return ret;
} else if (blockSize != TLH_DEFAULT_BLOCK_SIZE) {
fprintf(stderr, "hdfsGetDefaultBlockSizeAtPath(%s) got "
- "%"PRId64", but we expected %d\n",
+ "%"PRId64", but we expected %d\n",
path, blockSize, TLH_DEFAULT_BLOCK_SIZE);
return EIO;
}
@@ -153,6 +154,12 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
EXPECT_ZERO(doTestGetDefaultBlockSize(fs, paths->prefix));
+ /* There is no such directory.
+ * Check that errno is set to ENOENT
+ */
+ char invalid_path[] = "/some_invalid/path";
+ EXPECT_NULL_WITH_ERRNO(hdfsListDirectory(fs, invalid_path, &numEntries), ENOENT);
+
/* There should be no entry in the directory. */
errno = EACCES; // see if errno is set to 0 on success
EXPECT_NULL_WITH_ERRNO(hdfsListDirectory(fs, paths->prefix, &numEntries), 0);
@@ -188,11 +195,30 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
EXPECT_ZERO(hdfsCloseFile(fs, file));
/* There should be 1 entry in the directory. */
- EXPECT_NONNULL(hdfsListDirectory(fs, paths->prefix, &numEntries));
+ hdfsFileInfo * dirList = hdfsListDirectory(fs, paths->prefix, &numEntries);
+ EXPECT_NONNULL(dirList);
if (numEntries != 1) {
fprintf(stderr, "hdfsListDirectory set numEntries to "
"%d on directory containing 1 file.", numEntries);
}
+ hdfsFreeFileInfo(dirList, numEntries);
+
+ /* Create many files for ListDirectory to page through */
+ int nFile;
+ for (nFile = 0; nFile < 10000; nFile++) {
+ char filename[PATH_MAX];
+ snprintf(filename, PATH_MAX, "%s/many_files_%d", paths->prefix, nFile);
+ file = hdfsOpenFile(fs, filename, O_WRONLY, 0, 0, 0);
+ EXPECT_NONNULL(file);
+ EXPECT_ZERO(hdfsCloseFile(fs, file));
+ }
+ dirList = hdfsListDirectory(fs, paths->prefix, &numEntries);
+ EXPECT_NONNULL(dirList);
+ if (numEntries != 10002) {
+ fprintf(stderr, "hdfsListDirectory set numEntries to "
+ "%d on directory containing 10002 files.", numEntries);
+ }
+ hdfsFreeFileInfo(dirList, numEntries);
/* Let's re-open the file for reading */
file = hdfsOpenFile(fs, paths->file1, O_RDONLY, 0, 0, 0);
@@ -255,7 +281,25 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
snprintf(tmp, sizeof(tmp), "%s/nonexistent-file-name", paths->prefix);
EXPECT_NEGATIVE_ONE_WITH_ERRNO(hdfsChown(fs, tmp, "ha3", NULL), ENOENT);
- return 0;
+
+ //Test case: File does not exist
+ EXPECT_NULL_WITH_ERRNO(hdfsGetPathInfo(fs, invalid_path), ENOENT);
+
+// Test case: No permission to access parent directory
+// Trying to set permissions of the parent directory to 0
+// by a super user, and then connecting as SomeGuy. Should
+// receive permission denied, but receives fileInfo.
+// EXPECT_ZERO(hdfsChmod(fs, paths->prefix, 0));
+// EXPECT_ZERO(hdfsChmod(fs, paths->file2, 0));
+// EXPECT_ZERO(hdfsDisconnect(fs));
+// EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, "SomeGuy"));
+// EXPECT_NULL_WITH_ERRNO(hdfsGetPathInfo(fs, paths->file2), EACCES);
+// EXPECT_ZERO(hdfsDisconnect(fs));
+// EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL));
+// if (!fs) {
+// return 1;
+// }
+ return 0;
}
static int testHdfsOperationsImpl(struct tlhThreadInfo *ti)
@@ -310,7 +354,7 @@ static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
for (i = 0; i < tlhNumThreads; i++) {
if (ti[i].success != 0) {
fprintf(stderr, "%s%d", sep, i);
- sep = ", ";
+ sep = ", ";
}
}
fprintf(stderr, "]. FAILURE.\n");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f206a36c/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
index 1e4455a..ec26a55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
@@ -22,6 +22,8 @@
#include "hdfspp/status.h"
#include "hdfspp/events.h"
#include "hdfspp/block_location.h"
+#include "hdfspp/statinfo.h"
+
#include <functional>
#include <memory>
@@ -170,6 +172,33 @@ class FileSystem {
virtual Status Open(const std::string &path, FileHandle **handle) = 0;
/**
+ * Returns metadata about the file if the file/directory exists.
+ **/
+ virtual void
+ GetFileInfo(const std::string &path,
+ const std::function<void(const Status &, const StatInfo &)> &handler) = 0;
+ virtual Status GetFileInfo(const std::string &path, StatInfo & stat_info) = 0;
+
+ /**
+ * Retrieves the files contained in a directory and returns the metadata
+ * for each of them.
+ *
+ * The asynchronous method will return batches of files; the consumer must
+ * return true if they want more files to be delivered. The final bool
+ * parameter in the callback will be set to true if this is the final
+ * batch of files.
+ *
+ * The synchronous method will return all files in the directory.
+ *
+ * Path must be an absolute path in the hdfs filesytem (e.g. /tmp/foo/bar)
+ **/
+ virtual void
+ GetListing(const std::string &path,
+ const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> &handler) = 0;
+ virtual Status GetListing(const std::string &path,
+ std::shared_ptr<std::vector<StatInfo>> & stat_infos) = 0;
+
+ /**
* Returns the locations of all known blocks for the indicated file, or an error
* if the information clould not be found
*/
@@ -178,7 +207,6 @@ class FileSystem {
virtual Status GetBlockLocations(const std::string & path,
std::shared_ptr<FileBlockLocation> * locations) = 0;
-
/**
* Note that it is an error to destroy the filesystem from within a filesystem
* callback. It will lead to a deadlock and the termination of the process.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f206a36c/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h
new file mode 100644
index 0000000..a53ab8b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef HDFSPP_STATINFO_H_
+#define HDFSPP_STATINFO_H_
+
+namespace hdfs {
+
+/**
+ * Information that is assumed to be unchanging about a file for the duration of
+ * the operations.
+ */
+struct StatInfo {
+ enum FileType {
+ IS_DIR = 1,
+ IS_FILE = 2,
+ IS_SYMLINK = 3
+ };
+
+ int file_type;
+ ::std::string path;
+ unsigned long int length;
+ unsigned long int permissions; //Octal number as in POSIX permissions; e.g. 0777
+ ::std::string owner;
+ ::std::string group;
+ unsigned long int modification_time;
+ unsigned long int access_time;
+ ::std::string symlink;
+ unsigned int block_replication;
+ unsigned long int blocksize;
+ unsigned long int fileid;
+ unsigned long int children_num;
+ StatInfo()
+ : file_type(0),
+ length(0),
+ permissions(0),
+ modification_time(0),
+ access_time(0),
+ block_replication(0),
+ blocksize(0),
+ fileid(0),
+ children_num(0) {
+ }
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f206a36c/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
index 6a3ce73..7e8cdfa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
@@ -38,7 +38,8 @@ class Status {
static Status Exception(const char *expception_class_name, const char *error_message);
static Status Error(const char *error_message);
static Status AuthenticationFailed();
- static Status Canceled();
+ static Status Canceled();
+ static Status PathNotFound(const char *msg);
// success
bool ok() const { return code_ == 0; }
@@ -56,6 +57,7 @@ class Status {
kUnimplemented = static_cast<unsigned>(std::errc::function_not_supported),
kOperationCanceled = static_cast<unsigned>(std::errc::operation_canceled),
kPermissionDenied = static_cast<unsigned>(std::errc::permission_denied),
+ kPathNotFound = static_cast<unsigned>(std::errc::no_such_file_or_directory),
kException = 256,
kAuthenticationFailed = 257,
};
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f206a36c/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
index 8a73c41..0fe8479 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
@@ -26,6 +26,9 @@
#include <hdfs/hdfs.h>
#include <hdfspp/hdfs_ext.h>
+#include <libgen.h>
+#include "limits.h"
+
#include <string>
#include <cstring>
#include <iostream>
@@ -146,6 +149,10 @@ static int Error(const Status &stat) {
errnum = EACCES;
default_message = "Permission denied";
break;
+ case Status::Code::kPathNotFound:
+ errnum = ENOENT;
+ default_message = "No such file or directory";
+ break;
default:
errnum = ENOSYS;
default_message = "Error: unrecognised code";
@@ -316,6 +323,127 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
}
}
+void StatInfoToHdfsFileInfo(hdfsFileInfo * file_info,
+ const hdfs::StatInfo & stat_info) {
+ /* file or directory */
+ if (stat_info.file_type == StatInfo::IS_DIR) {
+ file_info->mKind = kObjectKindDirectory;
+ } else if (stat_info.file_type == StatInfo::IS_FILE) {
+ file_info->mKind = kObjectKindFile;
+ } else {
+ file_info->mKind = kObjectKindFile;
+ LOG_WARN(kFileSystem, << "Symlink is not supported! Reporting as a file: ");
+ }
+
+ /* the name of the file */
+ char copyOfPath[PATH_MAX];
+ strncpy(copyOfPath, stat_info.path.c_str(), PATH_MAX);
+ copyOfPath[PATH_MAX - 1] = '\0'; // in case strncpy ran out of space
+
+ char * mName = basename(copyOfPath);
+ size_t mName_size = strlen(mName);
+ file_info->mName = new char[mName_size+1];
+ strncpy(file_info->mName, basename(copyOfPath), mName_size + 1);
+
+ /* the last modification time for the file in seconds */
+ file_info->mLastMod = (tTime) stat_info.modification_time;
+
+ /* the size of the file in bytes */
+ file_info->mSize = (tOffset) stat_info.length;
+
+ /* the count of replicas */
+ file_info->mReplication = (short) stat_info.block_replication;
+
+ /* the block size for the file */
+ file_info->mBlockSize = (tOffset) stat_info.blocksize;
+
+ /* the owner of the file */
+ file_info->mOwner = new char[stat_info.owner.size() + 1];
+ strncpy(file_info->mOwner, stat_info.owner.c_str(), stat_info.owner.size() + 1);
+
+ /* the group associated with the file */
+ file_info->mGroup = new char[stat_info.group.size() + 1];
+ strncpy(file_info->mGroup, stat_info.group.c_str(), stat_info.group.size() + 1);
+
+ /* the permissions associated with the file */
+ file_info->mPermissions = (short) stat_info.permissions;
+
+ /* the last access time for the file in seconds */
+ file_info->mLastAccess = stat_info.access_time;
+}
+
+hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) {
+ try {
+ if (!CheckSystem(fs)) {
+ return nullptr;
+ }
+
+ hdfs::StatInfo stat_info;
+ Status stat = fs->get_impl()->GetFileInfo(path, stat_info);
+ if (!stat.ok()) {
+ Error(stat);
+ return nullptr;
+ }
+ hdfsFileInfo *file_info = new hdfsFileInfo[1];
+ StatInfoToHdfsFileInfo(file_info, stat_info);
+ return file_info;
+ } catch (const std::exception & e) {
+ ReportException(e);
+ return nullptr;
+ } catch (...) {
+ ReportCaughtNonException();
+ return nullptr;
+ }
+}
+
+hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) {
+ try {
+ if (!CheckSystem(fs)) {
+ *numEntries = 0;
+ return nullptr;
+ }
+
+ std::shared_ptr<std::vector<StatInfo>> stat_infos;
+ Status stat = fs->get_impl()->GetListing(path, stat_infos);
+ if (!stat.ok()) {
+ Error(stat);
+ *numEntries = 0;
+ return nullptr;
+ }
+ //Existing API expects nullptr if size is 0
+ if(!stat_infos || stat_infos->size()==0){
+ *numEntries = 0;
+ return nullptr;
+ }
+ *numEntries = stat_infos->size();
+ hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos->size()];
+ for(std::vector<StatInfo>::size_type i = 0; i < stat_infos->size(); i++) {
+ StatInfoToHdfsFileInfo(&file_infos[i], stat_infos->at(i));
+ }
+
+ return file_infos;
+ } catch (const std::exception & e) {
+ ReportException(e);
+ *numEntries = 0;
+ return nullptr;
+ } catch (...) {
+ ReportCaughtNonException();
+ *numEntries = 0;
+ return nullptr;
+ }
+}
+
+void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
+{
+ int i;
+ for (i = 0; i < numEntries; ++i) {
+ delete[] hdfsFileInfo[i].mName;
+ delete[] hdfsFileInfo[i].mOwner;
+ delete[] hdfsFileInfo[i].mGroup;
+ }
+ delete[] hdfsFileInfo;
+}
+
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
tSize length) {
try
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f206a36c/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
index e215a67..fd4df33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
@@ -26,6 +26,8 @@ namespace hdfs {
const char * kStatusAccessControlException = "org.apache.hadoop.security.AccessControlException";
const char * kStatusSaslException = "javax.security.sasl.SaslException";
+const char * kPathNotFoundException = "org.apache.hadoop.fs.InvalidPathException";
+const char * kPathNotFoundException2 = "java.io.FileNotFoundException";
Status::Status(int code, const char *msg1) : code_(code) {
if(msg1) {
@@ -53,6 +55,10 @@ Status Status::InvalidArgument(const char *msg) {
return Status(kInvalidArgument, msg);
}
+Status Status::PathNotFound(const char *msg){
+ return Status(kPathNotFound, msg);
+}
+
Status Status::ResourceUnavailable(const char *msg) {
return Status(kResourceUnavailable, msg);
}
@@ -66,6 +72,10 @@ Status Status::Exception(const char *exception_class_name, const char *error_mes
return Status(kPermissionDenied, error_message);
else if (exception_class_name && (strcmp(exception_class_name, kStatusSaslException) == 0))
return AuthenticationFailed();
+ else if (exception_class_name && (strcmp(exception_class_name, kPathNotFoundException) == 0))
+ return Status(kPathNotFound, error_message);
+ else if (exception_class_name && (strcmp(exception_class_name, kPathNotFoundException2) == 0))
+ return Status(kPathNotFound, error_message);
else
return Status(kException, exception_class_name, error_message);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f206a36c/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index d2e23b8..eace6fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -71,30 +71,19 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
using ::hadoop::hdfs::GetBlockLocationsResponseProto;
LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations("
- << FMT_THIS_ADDR << ", path=" << path << ", ...) called");
+ << FMT_THIS_ADDR << ", path=" << path << ", ...) called");
- struct State {
- GetBlockLocationsRequestProto req;
- std::shared_ptr<GetBlockLocationsResponseProto> resp;
- };
-
- auto m = continuation::Pipeline<State>::Create();
- auto &req = m->state().req;
+ GetBlockLocationsRequestProto req;
req.set_src(path);
req.set_offset(0);
req.set_length(std::numeric_limits<long long>::max());
- m->state().resp.reset(new GetBlockLocationsResponseProto());
- State *s = &m->state();
- m->Push(continuation::Bind(
- [this, s](const continuation::Continuation::Next &next) {
- namenode_.GetBlockLocations(&s->req, s->resp, next);
- }));
+ auto resp = std::make_shared<GetBlockLocationsResponseProto>();
- m->Run([this, handler](const Status &stat, const State &s) {
+ namenode_.GetBlockLocations(&req, resp, [resp, handler](const Status &stat) {
if (stat.ok()) {
auto file_info = std::make_shared<struct FileInfo>();
- auto locations = s.resp->locations();
+ auto locations = resp->locations();
file_info->file_length_ = locations.filelength();
file_info->last_block_complete_ = locations.islastblockcomplete();
@@ -117,11 +106,107 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
});
}
+void NameNodeOperations::GetFileInfo(const std::string & path,
+ std::function<void(const Status &, const StatInfo &)> handler)
+{
+ using ::hadoop::hdfs::GetFileInfoRequestProto;
+ using ::hadoop::hdfs::GetFileInfoResponseProto;
+
+ LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo("
+ << FMT_THIS_ADDR << ", path=" << path << ") called");
+
+ GetFileInfoRequestProto req;
+ req.set_src(path);
+
+ auto resp = std::make_shared<GetFileInfoResponseProto>();
+
+ namenode_.GetFileInfo(&req, resp, [resp, handler, path](const Status &stat) {
+ if (stat.ok()) {
+ // For non-existant files, the server will respond with an OK message but
+ // no fs in the protobuf.
+ if(resp -> has_fs()){
+ struct StatInfo stat_info;
+ stat_info.path=path;
+ HdfsFileStatusProtoToStatInfo(stat_info, resp->fs());
+ handler(stat, stat_info);
+ } else {
+ std::string errormsg = "No such file or directory: " + path;
+ Status statNew = Status::PathNotFound(errormsg.c_str());
+ handler(statNew, StatInfo());
+ }
+ } else {
+ handler(stat, StatInfo());
+ }
+ });
+}
+
+void NameNodeOperations::GetListing(
+ const std::string & path,
+ std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> handler,
+ const std::string & start_after) {
+ using ::hadoop::hdfs::GetListingRequestProto;
+ using ::hadoop::hdfs::GetListingResponseProto;
+
+ LOG_TRACE(
+ kFileSystem,
+ << "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called");
+
+ GetListingRequestProto req;
+ req.set_src(path);
+ req.set_startafter(start_after.c_str());
+ req.set_needlocation(false);
+
+ auto resp = std::make_shared<GetListingResponseProto>();
+
+ namenode_.GetListing(
+ &req,
+ resp,
+ [resp, handler, path](const Status &stat) {
+ if (stat.ok()) {
+ if(resp -> has_dirlist()){
+ std::shared_ptr<std::vector<StatInfo>> stat_infos(new std::vector<StatInfo>);
+ for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) {
+ StatInfo si;
+ si.path=fs.path();
+ HdfsFileStatusProtoToStatInfo(si, fs);
+ stat_infos->push_back(si);
+ }
+ handler(stat, stat_infos, resp->dirlist().remainingentries() > 0);
+ } else {
+ std::string errormsg = "No such file or directory: " + path;
+ Status statNew = Status::PathNotFound(errormsg.c_str());
+ std::shared_ptr<std::vector<StatInfo>> stat_infos;
+ handler(statNew, stat_infos, false);
+ }
+ } else {
+ std::shared_ptr<std::vector<StatInfo>> stat_infos;
+ handler(stat, stat_infos, false);
+ }
+ });
+}
+
void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) {
engine_.SetFsEventCallback(callback);
}
+void NameNodeOperations::HdfsFileStatusProtoToStatInfo(
+ hdfs::StatInfo & stat_info,
+ const ::hadoop::hdfs::HdfsFileStatusProto & fs) {
+ stat_info.file_type = fs.filetype();
+ stat_info.length = fs.length();
+ stat_info.permissions = fs.permission().perm();
+ stat_info.owner = fs.owner();
+ stat_info.group = fs.group();
+ stat_info.modification_time = fs.modification_time();
+ stat_info.access_time = fs.access_time();
+ stat_info.symlink = fs.symlink();
+ stat_info.block_replication = fs.block_replication();
+ stat_info.blocksize = fs.blocksize();
+ stat_info.fileid = fs.fileid();
+ stat_info.children_num = fs.childrennum();
+}
+
/*****************************************************************************
* FILESYSTEM BASE CLASS
****************************************************************************/
@@ -339,7 +424,6 @@ Status FileSystemImpl::Open(const std::string &path,
return stat;
}
-
BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto & locatedBlock)
{
BlockLocation result;
@@ -380,6 +464,10 @@ BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto
void FileSystemImpl::GetBlockLocations(const std::string & path,
const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> handler)
{
+ LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetBlockLocations("
+ << FMT_THIS_ADDR << ", path="
+ << path << ") called");
+
auto conversion = [handler](const Status & status, std::shared_ptr<const struct FileInfo> fileInfo) {
if (status.ok()) {
auto result = std::make_shared<FileBlockLocation>();
@@ -407,6 +495,10 @@ void FileSystemImpl::GetBlockLocations(const std::string & path,
Status FileSystemImpl::GetBlockLocations(const std::string & path,
std::shared_ptr<FileBlockLocation> * fileBlockLocations)
{
+ LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations("
+ << FMT_THIS_ADDR << ", path="
+ << path << ") called");
+
if (!fileBlockLocations)
return Status::InvalidArgument("Null pointer passed to GetBlockLocations");
@@ -433,6 +525,125 @@ Status FileSystemImpl::GetBlockLocations(const std::string & path,
return stat;
}
+void FileSystemImpl::GetFileInfo(
+ const std::string &path,
+ const std::function<void(const Status &, const StatInfo &)> &handler) {
+ LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetFileInfo("
+ << FMT_THIS_ADDR << ", path="
+ << path << ") called");
+
+ nn_.GetFileInfo(path, [handler](const Status &stat, const StatInfo &stat_info) {
+ handler(stat, stat_info);
+ });
+}
+
+Status FileSystemImpl::GetFileInfo(const std::string &path,
+ StatInfo & stat_info) {
+ LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetFileInfo("
+ << FMT_THIS_ADDR << ", path="
+ << path << ") called");
+
+ auto callstate = std::make_shared<std::promise<std::tuple<Status, StatInfo>>>();
+ std::future<std::tuple<Status, StatInfo>> future(callstate->get_future());
+
+ /* wrap async FileSystem::GetFileInfo with promise to make it a blocking call */
+ auto h = [callstate](const Status &s, const StatInfo &si) {
+ callstate->set_value(std::make_tuple(s, si));
+ };
+
+ GetFileInfo(path, h);
+
+ /* block until promise is set */
+ auto returnstate = future.get();
+ Status stat = std::get<0>(returnstate);
+ StatInfo info = std::get<1>(returnstate);
+
+ if (!stat.ok()) {
+ return stat;
+ }
+
+ stat_info = info;
+ return stat;
+}
+
+/**
+ * Helper function for recursive GetListing calls.
+ *
+ * Some compilers don't like recursive lambdas, so we make the lambda call a
+ * method, which in turn creates a lambda calling itself.
+ */
+void FileSystemImpl::GetListingShim(const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more,
+ std::string path,
+ const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler) {
+ bool has_next = stat_infos && stat_infos->size() > 0;
+ bool get_more = handler(stat, stat_infos, has_more && has_next);
+ if (get_more && has_more && has_next ) {
+ auto callback = [this, path, handler](const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more) {
+ GetListingShim(stat, stat_infos, has_more, path, handler);
+ };
+
+ std::string last = stat_infos->back().path;
+ nn_.GetListing(path, callback, last);
+ }
+}
+
+void FileSystemImpl::GetListing(
+ const std::string &path,
+ const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler) {
+ LOG_INFO(kFileSystem, << "FileSystemImpl::GetListing("
+ << FMT_THIS_ADDR << ", path="
+ << path << ") called");
+
+ // Caputure the state and push it into the shim
+ auto callback = [this, path, handler](const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more) {
+ GetListingShim(stat, stat_infos, has_more, path, handler);
+ };
+
+ nn_.GetListing(path, callback);
+}
+
+Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr<std::vector<StatInfo>> &stat_infos) {
+ LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]GetListing("
+ << FMT_THIS_ADDR << ", path="
+ << path << ") called");
+
+ // In this case, we're going to allocate the result on the heap and have the
+ // async code populate it.
+ auto results = std::make_shared<std::vector<StatInfo>>();
+
+ auto callstate = std::make_shared<std::promise<Status>>();
+ std::future<Status> future(callstate->get_future());
+
+ /* wrap async FileSystem::GetListing with promise to make it a blocking call.
+ *
+ Keep requesting more until we get the entire listing, and don't set the promise
+ * until we have the entire listing.
+ */
+ auto h = [callstate, results](const Status &s, std::shared_ptr<std::vector<StatInfo>> si, bool has_more) -> bool {
+ if (si) {
+ results->insert(results->end(), si->begin(), si->end());
+ }
+
+ bool done = !s.ok() || !has_more;
+ if (done) {
+ callstate->set_value(s);
+ return false;
+ }
+ return true;
+ };
+
+ GetListing(path, h);
+
+ /* block until promise is set */
+ Status stat = future.get();
+
+ if (!stat.ok()) {
+ return stat;
+ }
+
+ stat_infos = results;
+ return stat;
+}
void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f206a36c/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
index 0a479a6..869fd2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -28,6 +28,7 @@
#include "rpc/rpc_engine.h"
#include "reader/block_reader.h"
#include "reader/fileinfo.h"
+#include "hdfspp/statinfo.h"
#include "ClientNamenodeProtocol.pb.h"
#include "ClientNamenodeProtocol.hrpc.inl"
@@ -64,8 +65,20 @@ public:
void GetBlockLocations(const std::string & path,
std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler);
+ void GetFileInfo(const std::string & path,
+ std::function<void(const Status &, const StatInfo &)> handler);
+
+ // start_after="" for initial call
+ void GetListing(const std::string & path,
+ std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> handler,
+ const std::string & start_after = "");
+
void SetFsEventCallback(fs_event_callback callback);
+
private:
+ static void HdfsFileStatusProtoToStatInfo(hdfs::StatInfo & si, const ::hadoop::hdfs::HdfsFileStatusProto & fs);
+ static void DirectoryListingProtoToStatInfo(std::shared_ptr<std::vector<StatInfo>> stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl);
+
::asio::io_service * io_service_;
RpcEngine engine_;
ClientNamenodeProtocol namenode_;
@@ -105,6 +118,17 @@ public:
&handler) override;
Status Open(const std::string &path, FileHandle **handle) override;
+ void GetFileInfo(
+ const std::string &path,
+ const std::function<void(const Status &, const StatInfo &)> &handler) override;
+
+ Status GetFileInfo(const std::string &path, StatInfo & stat_info) override;
+
+ void GetListing(
+ const std::string &path,
+ const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> &handler) override;
+
+ Status GetListing(const std::string &path, std::shared_ptr<std::vector<StatInfo>> &stat_infos) override;
virtual void GetBlockLocations(const std::string & path,
const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> ) override;
@@ -150,6 +174,11 @@ private:
* exposes implementation details that may change at any time.
**/
std::shared_ptr<LibhdfsEvents> event_handlers_;
+
+ void GetListingShim(const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more,
+ std::string path,
+ const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler);
+
};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f206a36c/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c
index cff837c..0737d08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c
@@ -78,8 +78,7 @@ void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) {
}
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
- REPORT_FUNCTION_NOT_IMPLEMENTED
- return NULL;
+ return (hdfsFS) libhdfspp_hdfsConnectAsUser(nn, port, user);
}
hdfsFS hdfsConnect(const char* nn, tPort port) {
@@ -294,16 +293,16 @@ int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path,
int *numEntries) {
- return (hdfsFileInfo *)libhdfs_hdfsListDirectory(fs->libhdfsRep, path, numEntries);
+ return (hdfsFileInfo *)libhdfspp_hdfsListDirectory(fs->libhdfsppRep, path, numEntries);
}
hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) {
- return (hdfsFileInfo *)libhdfs_hdfsGetPathInfo(fs->libhdfsRep, path);
+ return (hdfsFileInfo *)libhdfspp_hdfsGetPathInfo(fs->libhdfsppRep, path);
}
void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) {
- return libhdfs_hdfsFreeFileInfo
- ((libhdfs_hdfsFileInfo *) hdfsFileInfo, numEntries);
+ return libhdfspp_hdfsFreeFileInfo
+ ((libhdfspp_hdfsFileInfo *) hdfsFileInfo, numEntries);
}
int hdfsFileIsEncrypted(hdfsFileInfo *hdfsFileInfo) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org