You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2012/10/29 06:10:29 UTC
svn commit: r1403173 [2/2] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/contrib/libwebhdfs/src/ src/main/native/libhdfs/
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c?rev=1403173&r1=1403172&r2=1403173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c Mon Oct 29 05:10:29 2012
@@ -25,6 +25,11 @@
#include <ctype.h>
#include <jansson.h>
+static const char * const temporaryRedirectCode = "307 TEMPORARY_REDIRECT";
+static const char * const twoHundredOKCode = "200 OK";
+static const char * const twoHundredOneCreatedCode = "201 Created";
+static const char * const httpHeaderString = "HTTP/1.1";
+
/**
* Exception information after calling JSON operations
*/
@@ -34,9 +39,6 @@ struct jsonException {
const char *message;
};
-static hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat,
- int *numEntries, const char *operation);
-
static void dotsToSlashes(char *str)
{
for (; *str != '\0'; str++) {
@@ -45,8 +47,9 @@ static void dotsToSlashes(char *str)
}
}
-int printJsonExceptionV(struct jsonException *exc, int noPrintFlags,
- const char *fmt, va_list ap)
+/** Print out the JSON exception information */
+static int printJsonExceptionV(struct jsonException *exc, int noPrintFlags,
+ const char *fmt, va_list ap)
{
char *javaClassName = NULL;
int excErrno = EINTERNAL, shouldPrint = 0;
@@ -74,11 +77,23 @@ int printJsonExceptionV(struct jsonExcep
return excErrno;
}
-int printJsonException(struct jsonException *exc, int noPrintFlags,
- const char *fmt, ...)
+/**
+ * Print out JSON exception information.
+ *
+ * @param exc The exception information to print and free
+ * @param noPrintFlags Flags which determine which exceptions we should NOT
+ * print.
+ * @param fmt Printf-style format list
+ * @param ... Printf-style varargs
+ *
+ * @return The POSIX error number associated with the exception
+ * object.
+ */
+static int printJsonException(struct jsonException *exc, int noPrintFlags,
+ const char *fmt, ...)
{
va_list ap;
- int ret;
+ int ret = 0;
va_start(ap, fmt);
ret = printJsonExceptionV(exc, noPrintFlags, fmt, ap);
@@ -86,81 +101,20 @@ int printJsonException(struct jsonExcept
return ret;
}
-static hdfsFileInfo *json_parse_array(json_t *jobj, char *key, hdfsFileInfo *fileStat, int *numEntries, const char *operation) {
- int arraylen = json_array_size(jobj); //Getting the length of the array
- *numEntries = arraylen;
- if (!key) {
- return NULL;
- }
- if(arraylen > 0) {
- fileStat = (hdfsFileInfo *)realloc(fileStat,sizeof(hdfsFileInfo)*arraylen);
- }
- json_t *jvalue;
- int i;
- for (i=0; i< arraylen; i++) {
- jvalue = json_array_get(jobj, i); //Getting the array element at position i
- if (json_is_array(jvalue)) { // array within an array - program should never come here for now
- json_parse_array(jvalue, NULL, &fileStat[i], numEntries, operation);
- }
- else if (json_is_object(jvalue)) { // program will definitely come over here
- parseJsonGFS(jvalue, &fileStat[i], numEntries, operation);
- }
- else {
- return NULL; // program will never come over here for now
- }
- }
- *numEntries = arraylen;
- return fileStat;
-}
-
-int parseBoolean(char *response) {
- json_t *root;
- json_error_t error;
- size_t flags = 0;
- int result = 0;
- const char *key;
- json_t *value;
- root = json_loads(response, flags, &error);
- void *iter = json_object_iter(root);
- while(iter) {
- key = json_object_iter_key(iter);
- value = json_object_iter_value(iter);
- switch (json_typeof(value)) {
- case JSON_TRUE:
- result = 1;
- break;
- default:
- result = 0;
- break;
- }
- iter = json_object_iter_next(root, iter);
- }
- return result;
-}
-
-int parseMKDIR(char *response) {
- return (parseBoolean(response));
-}
-
-int parseRENAME(char *response) {
- return (parseBoolean(response));
-}
-
-int parseDELETE(char *response) {
- return (parseBoolean(response));
-}
-
-struct jsonException *parseJsonException(json_t *jobj) {
- const char *key;
- json_t *value;
+/** Parse the exception information from JSON */
+static struct jsonException *parseJsonException(json_t *jobj)
+{
+ const char *key = NULL;
+ json_t *value = NULL;
struct jsonException *exception = NULL;
+ void *iter = NULL;
exception = calloc(1, sizeof(*exception));
if (!exception) {
return NULL;
}
- void *iter = json_object_iter(jobj);
+ iter = json_object_iter(jobj);
while (iter) {
key = json_object_iter_key(iter);
value = json_object_iter_value(iter);
@@ -175,23 +129,31 @@ struct jsonException *parseJsonException
iter = json_object_iter_next(jobj, iter);
}
-
return exception;
}
-struct jsonException *parseException(const char *content) {
- if (!content) {
- return NULL;
- }
-
+/**
+ * Parse the exception information which is presented in JSON
+ *
+ * @param content Exception information in JSON
+ * @return jsonException for printing out
+ */
+static struct jsonException *parseException(const char *content)
+{
json_error_t error;
size_t flags = 0;
- const char *key;
+ const char *key = NULL;
json_t *value;
- json_t *jobj = json_loads(content, flags, &error);
+ json_t *jobj;
+ struct jsonException *exception = NULL;
+ if (!content) {
+ return NULL;
+ }
+ jobj = json_loads(content, flags, &error);
if (!jobj) {
- fprintf(stderr, "JSon parsing failed\n");
+ fprintf(stderr, "JSon parsing error: on line %d: %s\n",
+ error.line, error.text);
return NULL;
}
void *iter = json_object_iter(jobj);
@@ -199,254 +161,503 @@ struct jsonException *parseException(con
key = json_object_iter_key(iter);
value = json_object_iter_value(iter);
- if (!strcmp(key, "RemoteException") && json_typeof(value) == JSON_OBJECT) {
- return parseJsonException(value);
+ if (!strcmp(key, "RemoteException") &&
+ json_typeof(value) == JSON_OBJECT) {
+ exception = parseJsonException(value);
+ break;
}
iter = json_object_iter_next(jobj, iter);
}
- return NULL;
+
+ json_decref(jobj);
+ return exception;
}
-static hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat,
- int *numEntries, const char *operation)
+/**
+ * Parse the response information which uses TRUE/FALSE
+ * to indicate whether the operation succeeded
+ *
+ * @param response Response information
+ * @return 0 to indicate success
+ */
+static int parseBoolean(const char *response)
{
- const char *tempstr;
- const char *key;
- json_t *value;
- void *iter = json_object_iter(jobj);
- while(iter) {
- key = json_object_iter_key(iter);
- value = json_object_iter_value(iter);
-
- switch (json_typeof(value)) {
- case JSON_INTEGER:
- if(!strcmp(key,"accessTime")) {
- fileStat->mLastAccess = (time_t)(json_integer_value(value)/1000);
- } else if (!strcmp(key,"blockSize")) {
- fileStat->mBlockSize = (tOffset)json_integer_value(value);
- } else if (!strcmp(key,"length")) {
- fileStat->mSize = (tOffset)json_integer_value(value);
- } else if(!strcmp(key,"modificationTime")) {
- fileStat->mLastMod = (time_t)(json_integer_value(value)/1000);
- } else if (!strcmp(key,"replication")) {
- fileStat->mReplication = (short)json_integer_value(value);
- }
- break;
-
- case JSON_STRING:
- if(!strcmp(key,"group")) {
- fileStat->mGroup=(char *)json_string_value(value);
- } else if (!strcmp(key,"owner")) {
- fileStat->mOwner=(char *)json_string_value(value);
- } else if (!strcmp(key,"pathSuffix")) {
- fileStat->mName=(char *)json_string_value(value);
- } else if (!strcmp(key,"permission")) {
- tempstr=(char *)json_string_value(value);
- fileStat->mPermissions = (short)strtol(tempstr,(char **)NULL,8);
- } else if (!strcmp(key,"type")) {
- char *cvalue = (char *)json_string_value(value);
- if (!strcmp(cvalue, "DIRECTORY")) {
- fileStat->mKind = kObjectKindDirectory;
- } else {
- fileStat->mKind = kObjectKindFile;
- }
- }
- break;
-
- case JSON_OBJECT:
- if(!strcmp(key,"FileStatus")) {
- parseJsonGFS(value, fileStat, numEntries, operation);
- } else if (!strcmp(key,"FileStatuses")) {
- fileStat = parseJsonGFS(value, &fileStat[0], numEntries, operation);
- } else if (!strcmp(key,"RemoteException")) {
- //Besides returning NULL, we also need to print the exception information
- struct jsonException *exception = parseJsonException(value);
- if (exception) {
- errno = printJsonException(exception, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
- }
-
- if(fileStat != NULL) {
- free(fileStat);
- fileStat = NULL;
- }
- }
- break;
-
- case JSON_ARRAY:
- if (!strcmp(key,"FileStatus")) {
- fileStat = json_parse_array(value,(char *) key,fileStat,numEntries, operation);
- }
- break;
-
- default:
- if(fileStat != NULL) {
- free(fileStat);
- fileStat = NULL;
- }
- }
- iter = json_object_iter_next(jobj, iter);
+ json_t *root, *value;
+ json_error_t error;
+ size_t flags = 0;
+ int result = 0;
+
+ root = json_loads(response, flags, &error);
+ if (!root) {
+ fprintf(stderr, "JSon parsing error: on line %d: %s\n",
+ error.line, error.text);
+ return EIO;
+ }
+ void *iter = json_object_iter(root);
+ value = json_object_iter_value(iter);
+ if (json_typeof(value) == JSON_TRUE) {
+ result = 0;
+ } else {
+ result = EIO; // FALSE means error in remote NN/DN
}
- return fileStat;
+ json_decref(root);
+ return result;
}
+int parseMKDIR(const char *response)
+{
+ return parseBoolean(response);
+}
+
+int parseRENAME(const char *response)
+{
+ return parseBoolean(response);
+}
+
+int parseDELETE(const char *response)
+{
+ return parseBoolean(response);
+}
-int checkHeader(char *header, const char *content, const char *operation) {
+int parseSETREPLICATION(const char *response)
+{
+ return parseBoolean(response);
+}
+
+/**
+ * Check the header of response to see if it's 200 OK
+ *
+ * @param header Header information for checking
+ * @param content Stores exception information if there are errors
+ * @param operation Indicate the operation for exception printing
+ * @return 0 for success
+ */
+static int checkHeader(const char *header, const char *content,
+ const char *operation)
+{
char *result = NULL;
- char delims[] = ":";
- char *responseCode= "200 OK";
- if(header == '\0' || strncmp(header, "HTTP/", strlen("HTTP/"))) {
- return 0;
+ const char delims[] = ":";
+ char *savepter;
+ int ret = 0;
+
+ if (!header || strncmp(header, "HTTP/", strlen("HTTP/"))) {
+ return EINVAL;
}
- if(!(strstr(header, responseCode)) || !(header = strstr(header, "Content-Length"))) {
+ if (!(strstr(header, twoHundredOKCode)) ||
+ !(result = strstr(header, "Content-Length"))) {
struct jsonException *exc = parseException(content);
if (exc) {
- errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
+ ret = printJsonException(exc, PRINT_EXC_ALL,
+ "Calling WEBHDFS (%s)", operation);
+ } else {
+ ret = EIO;
}
- return 0;
+ return ret;
}
- result = strtok(header, delims);
- result = strtok(NULL,delims);
+ result = strtok_r(result, delims, &savepter);
+ result = strtok_r(NULL, delims, &savepter);
while (isspace(*result)) {
result++;
}
- if(strcmp(result,"0")) { //Content-Length should be equal to 0
- return 1;
- } else {
- return 0;
- }
-}
-
-int parseOPEN(const char *header, const char *content) {
- const char *responseCode1 = "307 TEMPORARY_REDIRECT";
- const char *responseCode2 = "200 OK";
- if(header == '\0' || strncmp(header,"HTTP/",strlen("HTTP/"))) {
- return -1;
- }
- if(!(strstr(header,responseCode1) && strstr(header, responseCode2))) {
- struct jsonException *exc = parseException(content);
- if (exc) {
- //if the exception is an IOException and it is because the offset is out of the range
- //do not print out the exception
- if (!strcasecmp(exc->exception, "IOException") && strstr(exc->message, "out of the range")) {
- return 0;
- }
- errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (OPEN)");
- }
- return -1;
+ // Content-Length should be equal to 0,
+ // and the string should be "0\r\nServer"
+ if (strncmp(result, "0\r\n", 3)) {
+ ret = EIO;
}
-
- return 1;
+ return ret;
}
-int parseCHMOD(char *header, const char *content) {
+int parseCHMOD(const char *header, const char *content)
+{
return checkHeader(header, content, "CHMOD");
}
-
-int parseCHOWN(char *header, const char *content) {
+int parseCHOWN(const char *header, const char *content)
+{
return checkHeader(header, content, "CHOWN");
}
-int parseUTIMES(char *header, const char *content) {
- return checkHeader(header, content, "UTIMES");
+int parseUTIMES(const char *header, const char *content)
+{
+ return checkHeader(header, content, "SETTIMES");
}
-
-int checkIfRedirect(const char *const headerstr, const char *content, const char *operation) {
- char *responseCode = "307 TEMPORARY_REDIRECT";
- char * locTag = "Location";
- char * tempHeader;
- if(headerstr == '\0' || strncmp(headerstr,"HTTP/", 5)) {
- return 0;
+/**
+ * Check if the header contains correct information
+ * ("307 TEMPORARY_REDIRECT" and "Location")
+ *
+ * @param header Header for parsing
+ * @param content Contains exception information
+ * if the remote operation failed
+ * @param operation Specify the remote operation when printing out exception
+ * @return 0 for success
+ */
+static int checkRedirect(const char *header,
+ const char *content, const char *operation)
+{
+ const char *locTag = "Location";
+ int ret = 0, offset = 0;
+
+ // The header must start with "HTTP/1.1"
+ if (!header || strncmp(header, httpHeaderString,
+ strlen(httpHeaderString))) {
+ return EINVAL;
}
- if(!(tempHeader = strstr(headerstr,responseCode))) {
- //process possible exception information
+
+ offset += strlen(httpHeaderString);
+ while (isspace(header[offset])) {
+ offset++;
+ }
+ // Looking for "307 TEMPORARY_REDIRECT" in header
+ if (strncmp(header + offset, temporaryRedirectCode,
+ strlen(temporaryRedirectCode))) {
+ // Process possible exception information
struct jsonException *exc = parseException(content);
if (exc) {
- errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
- }
- return 0;
- }
- if(!(strstr(tempHeader,locTag))) {
- return 0;
+ ret = printJsonException(exc, PRINT_EXC_ALL,
+ "Calling WEBHDFS (%s)", operation);
+ } else {
+ ret = EIO;
+ }
+ return ret;
+ }
+ // Here we just simply check if header contains "Location" tag,
+ // detailed processing is in parseDnLoc
+ if (!(strstr(header, locTag))) {
+ ret = EIO;
}
- return 1;
+ return ret;
}
-
-int parseNnWRITE(const char *header, const char *content) {
- return checkIfRedirect(header, content, "Write(NameNode)");
+int parseNnWRITE(const char *header, const char *content)
+{
+ return checkRedirect(header, content, "Write(NameNode)");
}
+int parseNnAPPEND(const char *header, const char *content)
+{
+ return checkRedirect(header, content, "Append(NameNode)");
+}
-int parseNnAPPEND(const char *header, const char *content) {
- return checkIfRedirect(header, content, "Append(NameNode)");
+/** 0 for success , -1 for out of range, other values for error */
+int parseOPEN(const char *header, const char *content)
+{
+ int ret = 0, offset = 0;
+
+ if (!header || strncmp(header, httpHeaderString,
+ strlen(httpHeaderString))) {
+ return EINVAL;
+ }
+
+ offset += strlen(httpHeaderString);
+ while (isspace(header[offset])) {
+ offset++;
+ }
+ if (strncmp(header + offset, temporaryRedirectCode,
+ strlen(temporaryRedirectCode)) ||
+ !strstr(header, twoHundredOKCode)) {
+ struct jsonException *exc = parseException(content);
+ if (exc) {
+ // If the exception is an IOException and it is because
+ // the offset is out of the range, do not print out the exception
+ if (!strcasecmp(exc->exception, "IOException") &&
+ strstr(exc->message, "out of the range")) {
+ ret = -1;
+ } else {
+ ret = printJsonException(exc, PRINT_EXC_ALL,
+ "Calling WEBHDFS (OPEN)");
+ }
+ } else {
+ ret = EIO;
+ }
+ }
+ return ret;
}
-char *parseDnLoc(char *content) {
- char delims[] = "\r\n";
- char *url = NULL;
- char *DnLocation = NULL;
- char *savepter;
- DnLocation = strtok_r(content, delims, &savepter);
- while (DnLocation && strncmp(DnLocation, "Location:", strlen("Location:"))) {
- DnLocation = strtok_r(NULL, delims, &savepter);
+int parseDnLoc(char *content, char **dn)
+{
+ char *url = NULL, *dnLocation = NULL, *savepter, *tempContent;
+ const char *prefix = "Location: http://";
+ const char *prefixToRemove = "Location: ";
+ const char *delims = "\r\n";
+
+ tempContent = strdup(content);
+ if (!tempContent) {
+ return ENOMEM;
}
- if (!DnLocation) {
- return NULL;
+
+ dnLocation = strtok_r(tempContent, delims, &savepter);
+ while (dnLocation && strncmp(dnLocation, "Location:",
+ strlen("Location:"))) {
+ dnLocation = strtok_r(NULL, delims, &savepter);
}
- DnLocation = strstr(DnLocation, "http");
- if (!DnLocation) {
- return NULL;
+ if (!dnLocation) {
+ return EIO;
+ }
+
+ while (isspace(*dnLocation)) {
+ dnLocation++;
}
- url = malloc(strlen(DnLocation) + 1);
+ if (strncmp(dnLocation, prefix, strlen(prefix))) {
+ return EIO;
+ }
+ url = strdup(dnLocation + strlen(prefixToRemove));
if (!url) {
- return NULL;
+ return ENOMEM;
}
- strcpy(url, DnLocation);
- return url;
+ *dn = url;
+ return 0;
}
-int parseDnWRITE(const char *header, const char *content) {
- char *responseCode = "201 Created";
- fprintf(stderr, "\nheaderstr is: %s\n", header);
- if(header == '\0' || strncmp(header,"HTTP/",strlen("HTTP/"))) {
- return 0;
+int parseDnWRITE(const char *header, const char *content)
+{
+ int ret = 0;
+ if (header == NULL || header[0] == '\0' ||
+ strncmp(header, "HTTP/", strlen("HTTP/"))) {
+ return EINVAL;
}
- if(!(strstr(header,responseCode))) {
+ if (!(strstr(header, twoHundredOneCreatedCode))) {
struct jsonException *exc = parseException(content);
if (exc) {
- errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (WRITE(DataNode))");
+ ret = printJsonException(exc, PRINT_EXC_ALL,
+ "Calling WEBHDFS (WRITE(DataNode))");
+ } else {
+ ret = EIO;
}
- return 0;
}
- return 1;
+ return ret;
}
-int parseDnAPPEND(const char *header, const char *content) {
- char *responseCode = "200 OK";
- if(header == '\0' || strncmp(header, "HTTP/", strlen("HTTP/"))) {
- return 0;
+int parseDnAPPEND(const char *header, const char *content)
+{
+ int ret = 0;
+
+ if (header == NULL || header[0] == '\0' ||
+ strncmp(header, "HTTP/", strlen("HTTP/"))) {
+ return EINVAL;
}
- if(!(strstr(header, responseCode))) {
+ if (!(strstr(header, twoHundredOKCode))) {
struct jsonException *exc = parseException(content);
if (exc) {
- errno = printJsonException(exc, PRINT_EXC_ALL, "Calling WEBHDFS (APPEND(DataNode))");
+ ret = printJsonException(exc, PRINT_EXC_ALL,
+ "Calling WEBHDFS (APPEND(DataNode))");
+ } else {
+ ret = EIO;
}
- return 0;
}
- return 1;
+ return ret;
}
-hdfsFileInfo *parseGFS(char *str, hdfsFileInfo *fileStat, int *numEntries) {
+/**
+ * Retrieve file status from the JSON object
+ *
+ * @param jobj JSON object for parsing, which contains
+ * file status information
+ * @param fileStat hdfsFileInfo handle to hold file status information
+ * @return 0 on success
+ */
+static int parseJsonForFileStatus(json_t *jobj, hdfsFileInfo *fileStat)
+{
+ const char *key, *tempstr;
+ json_t *value;
+ void *iter = NULL;
+
+ iter = json_object_iter(jobj);
+ while (iter) {
+ key = json_object_iter_key(iter);
+ value = json_object_iter_value(iter);
+
+ if (!strcmp(key, "accessTime")) {
+ // json field contains time in milliseconds,
+ // hdfsFileInfo is counted in seconds
+ fileStat->mLastAccess = json_integer_value(value) / 1000;
+ } else if (!strcmp(key, "blockSize")) {
+ fileStat->mBlockSize = json_integer_value(value);
+ } else if (!strcmp(key, "length")) {
+ fileStat->mSize = json_integer_value(value);
+ } else if (!strcmp(key, "modificationTime")) {
+ fileStat->mLastMod = json_integer_value(value) / 1000;
+ } else if (!strcmp(key, "replication")) {
+ fileStat->mReplication = json_integer_value(value);
+ } else if (!strcmp(key, "group")) {
+ fileStat->mGroup = strdup(json_string_value(value));
+ if (!fileStat->mGroup) {
+ return ENOMEM;
+ }
+ } else if (!strcmp(key, "owner")) {
+ fileStat->mOwner = strdup(json_string_value(value));
+ if (!fileStat->mOwner) {
+ return ENOMEM;
+ }
+ } else if (!strcmp(key, "pathSuffix")) {
+ fileStat->mName = strdup(json_string_value(value));
+ if (!fileStat->mName) {
+ return ENOMEM;
+ }
+ } else if (!strcmp(key, "permission")) {
+ tempstr = json_string_value(value);
+ fileStat->mPermissions = (short) strtol(tempstr, NULL, 8);
+ } else if (!strcmp(key, "type")) {
+ tempstr = json_string_value(value);
+ if (!strcmp(tempstr, "DIRECTORY")) {
+ fileStat->mKind = kObjectKindDirectory;
+ } else {
+ fileStat->mKind = kObjectKindFile;
+ }
+ }
+ // Go to the next key-value pair in the json object
+ iter = json_object_iter_next(jobj, iter);
+ }
+ return 0;
+}
+
+int parseGFS(const char *response, hdfsFileInfo *fileStat, int printError)
+{
+ int ret = 0, printFlag;
json_error_t error;
size_t flags = 0;
- json_t *jobj = json_loads(str, flags, &error);
- fileStat = parseJsonGFS(jobj, fileStat, numEntries, "GETPATHSTATUS/LISTSTATUS");
- return fileStat;
+ json_t *jobj, *value;
+ const char *key;
+ void *iter = NULL;
+
+ if (!response || !fileStat) {
+ return EIO;
+ }
+ jobj = json_loads(response, flags, &error);
+ if (!jobj) {
+ fprintf(stderr, "error while parsing json: on line %d: %s\n",
+ error.line, error.text);
+ return EIO;
+ }
+ iter = json_object_iter(jobj);
+ key = json_object_iter_key(iter);
+ value = json_object_iter_value(iter);
+ if (json_typeof(value) == JSON_OBJECT) {
+ if (!strcmp(key, "RemoteException")) {
+ struct jsonException *exception = parseJsonException(value);
+ if (exception) {
+ if (printError) {
+ printFlag = PRINT_EXC_ALL;
+ } else {
+ printFlag = NOPRINT_EXC_FILE_NOT_FOUND |
+ NOPRINT_EXC_ACCESS_CONTROL |
+ NOPRINT_EXC_PARENT_NOT_DIRECTORY;
+ }
+ ret = printJsonException(exception, printFlag,
+ "Calling WEBHDFS GETFILESTATUS");
+ } else {
+ ret = EIO;
+ }
+ } else if (!strcmp(key, "FileStatus")) {
+ ret = parseJsonForFileStatus(value, fileStat);
+ } else {
+ ret = EIO;
+ }
+
+ } else {
+ ret = EIO;
+ }
+
+ json_decref(jobj);
+ return ret;
}
-int parseSETREPLICATION(char *response) {
- return (parseBoolean(response));
+/**
+ * Parse the JSON array. Called to parse the result of
+ * the LISTSTATUS operation. Thus each element of the JSON array is
+ * a JSON object with the information of a file entry contained
+ * in the folder.
+ *
+ * @param jobj The JSON array to be parsed
+ * @param fileStat The hdfsFileInfo handle used to
+ * store a group of file information
+ * @param numEntries Capture the number of files in the folder
+ * @return 0 for success
+ */
+static int parseJsonArrayForFileStatuses(json_t *jobj, hdfsFileInfo **fileStat,
+ int *numEntries)
+{
+ json_t *jvalue = NULL;
+ int i = 0, ret = 0, arraylen = 0;
+ hdfsFileInfo *fileInfo = NULL;
+
+ arraylen = (int) json_array_size(jobj);
+ if (arraylen > 0) {
+ fileInfo = calloc(arraylen, sizeof(hdfsFileInfo));
+ if (!fileInfo) {
+ return ENOMEM;
+ }
+ }
+ for (i = 0; i < arraylen; i++) {
+ //Getting the array element at position i
+ jvalue = json_array_get(jobj, i);
+ if (json_is_object(jvalue)) {
+ ret = parseJsonForFileStatus(jvalue, &fileInfo[i]);
+ if (ret) {
+ goto done;
+ }
+ } else {
+ ret = EIO;
+ goto done;
+ }
+ }
+done:
+ if (ret) {
+ free(fileInfo);
+ } else {
+ *numEntries = arraylen;
+ *fileStat = fileInfo;
+ }
+ return ret;
}
+int parseLS(const char *response, hdfsFileInfo **fileStats, int *numOfEntries)
+{
+ int ret = 0;
+ json_error_t error;
+ size_t flags = 0;
+ json_t *jobj, *value;
+ const char *key;
+ void *iter = NULL;
+
+ if (!response || response[0] == '\0' || !fileStats) {
+ return EIO;
+ }
+ jobj = json_loads(response, flags, &error);
+ if (!jobj) {
+ fprintf(stderr, "error while parsing json: on line %d: %s\n",
+ error.line, error.text);
+ return EIO;
+ }
+
+ iter = json_object_iter(jobj);
+ key = json_object_iter_key(iter);
+ value = json_object_iter_value(iter);
+ if (json_typeof(value) == JSON_OBJECT) {
+ if (!strcmp(key, "RemoteException")) {
+ struct jsonException *exception = parseJsonException(value);
+ if (exception) {
+ ret = printJsonException(exception, PRINT_EXC_ALL,
+ "Calling WEBHDFS GETFILESTATUS");
+ } else {
+ ret = EIO;
+ }
+ } else if (!strcmp(key, "FileStatuses")) {
+ iter = json_object_iter(value);
+ value = json_object_iter_value(iter);
+ if (json_is_array(value)) {
+ ret = parseJsonArrayForFileStatuses(value, fileStats,
+ numOfEntries);
+ } else {
+ ret = EIO;
+ }
+ } else {
+ ret = EIO;
+ }
+ } else {
+ ret = EIO;
+ }
+
+ json_decref(jobj);
+ return ret;
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h?rev=1403173&r1=1403172&r2=1403173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h Mon Oct 29 05:10:29 2012
@@ -18,41 +18,161 @@
#ifndef _HDFS_JSON_PARSER_H_
#define _HDFS_JSON_PARSER_H_
-struct jsonException;
+/**
+ * Parse the response for MKDIR request. The response uses TRUE/FALSE
+ * to indicate whether the operation succeeded.
+ *
+ * @param response The response information to parse.
+ * @return 0 for success
+ */
+int parseMKDIR(const char *response);
/**
- * Print out JSON exception information.
+ * Parse the response for RENAME request. The response uses TRUE/FALSE
+ * to indicate whether the operation succeeded.
*
- * @param exc The exception information to print and free
- * @param noPrintFlags Flags which determine which exceptions we should NOT
- * print.
- * @param fmt Printf-style format list
- * @param ... Printf-style varargs
+ * @param response The response information to parse.
+ * @return 0 for success
+ */
+int parseRENAME(const char *response);
+
+/**
+ * Parse the response for DELETE request. The response uses TRUE/FALSE
+ * to indicate whether the operation succeeded.
*
- * @return The POSIX error number associated with the exception
- * object.
+ * @param response The response information to parse.
+ * @return 0 for success
*/
-int printJsonException(struct jsonException *exc, int noPrintFlags,
- const char *fmt, ...);
+int parseDELETE(const char *response);
-int parseMKDIR(char *response);
-int parseRENAME(char *response);
-int parseDELETE (char *response);
-int parseSETREPLICATION(char *response);
+/**
+ * Parse the response for SETREPLICATION request. The response uses TRUE/FALSE
+ * to indicate whether the operation succeeded.
+ *
+ * @param response The response information to parse.
+ * @return 0 for success
+ */
+int parseSETREPLICATION(const char *response);
+/**
+ * Parse the response for OPEN (read) request. A successful operation
+ * will return "200 OK".
+ *
+ * @param response The response information for parsing
+ * @return 0 for success , -1 for out of range, other values for error
+ */
int parseOPEN(const char *header, const char *content);
+/**
+ * Parse the response for WRITE (from NameNode) request.
+ * A successful operation should return "307 TEMPORARY_REDIRECT" in its header.
+ *
+ * @param header The header of the http response
+ * @param content If failing, the exception message
+ * sent from NameNode is stored in content
+ * @return 0 for success
+ */
int parseNnWRITE(const char *header, const char *content);
+
+/**
+ * Parse the response for WRITE (from DataNode) request.
+ * A successful operation should return "201 Created" in its header.
+ *
+ * @param header The header of the http response
+ * @param content If failing, the exception message
+ * sent from DataNode is stored in content
+ * @return 0 for success
+ */
int parseDnWRITE(const char *header, const char *content);
+
+/**
+ * Parse the response for APPEND (sent from NameNode) request.
+ * A successful operation should return "307 TEMPORARY_REDIRECT" in its header.
+ *
+ * @param header The header of the http response
+ * @param content If failing, the exception message
+ * sent from NameNode is stored in content
+ * @return 0 for success
+ */
int parseNnAPPEND(const char *header, const char *content);
+
+/**
+ * Parse the response for APPEND (from DataNode) request.
+ * A successful operation should return "200 OK" in its header.
+ *
+ * @param header The header of the http response
+ * @param content If failing, the exception message
+ * sent from DataNode is stored in content
+ * @return 0 for success
+ */
int parseDnAPPEND(const char *header, const char *content);
-char* parseDnLoc(char *content);
+/**
+ * Parse the response (from NameNode) to get the location information
+ * of the DataNode that should be contacted for the following write operation.
+ *
+ * @param content Content of the http header
+ * @param dn To store the location of the DataNode for writing
+ * @return 0 for success
+ */
+int parseDnLoc(char *content, char **dn) __attribute__ ((warn_unused_result));
+
+/**
+ * Parse the response for GETFILESTATUS operation.
+ *
+ * @param response Response to parse. Its detailed format is specified in
+ * "http://hadoop.apache.org/docs/stable/webhdfs.html#GETFILESTATUS"
+ * @param fileStat A hdfsFileInfo handle for holding file information
+ * @param printError Whether or not print out exception
+ * when file does not exist
+ * @return 0 for success, non-zero value to indicate error
+ */
+int parseGFS(const char *response, hdfsFileInfo *fileStat, int printError);
-hdfsFileInfo *parseGFS(char *response, hdfsFileInfo *fileStat, int *numEntries);
+/**
+ * Parse the response for LISTSTATUS operation.
+ *
+ * @param response Response to parse. Its detailed format is specified in
+ * "http://hadoop.apache.org/docs/r1.0.3/webhdfs.html#LISTSTATUS"
+ * @param fileStats Pointer pointing to a list of hdfsFileInfo handles
+ * holding file/dir information in the directory
+ * @param numEntries After parsing, the value of this parameter indicates
+ * the number of file entries.
+ * @return 0 for success, non-zero value to indicate error
+ */
+int parseLS(const char *response, hdfsFileInfo **fileStats, int *numOfEntries);
-int parseCHOWN (char *header, const char *content);
-int parseCHMOD (char *header, const char *content);
-int parseUTIMES(char *header, const char *content);
+/**
+ * Parse the response for CHOWN request.
+ * A successful operation should contains "200 OK" in its header,
+ * and the Content-Length should be 0.
+ *
+ * @param header The header of the http response
+ * @param content If failing, the exception message is stored in content
+ * @return 0 for success
+ */
+int parseCHOWN(const char *header, const char *content);
+
+/**
+ * Parse the response for CHMOD request.
+ * A successful operation should contains "200 OK" in its header,
+ * and the Content-Length should be 0.
+ *
+ * @param header The header of the http response
+ * @param content If failing, the exception message is stored in content
+ * @return 0 for success
+ */
+int parseCHMOD(const char *header, const char *content);
+
+/**
+ * Parse the response for SETTIMES request.
+ * A successful operation should contains "200 OK" in its header,
+ * and the Content-Length should be 0.
+ *
+ * @param header The header of the http response
+ * @param content If failing, the exception message is stored in content
+ * @return 0 for success
+ */
+int parseUTIMES(const char *header, const char *content);
-#endif //_FUSE_JSON_PARSER_H
+#endif //_HDFS_JSON_PARSER_H_
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c?rev=1403173&r1=1403172&r2=1403173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c Mon Oct 29 05:10:29 2012
@@ -16,6 +16,10 @@
* limitations under the License.
*/
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+
#include "exception.h"
#include "hdfs.h"
#include "hdfs_http_client.h"
@@ -23,15 +27,9 @@
#include "hdfs_json_parser.h"
#include "jni_helper.h"
-#include <inttypes.h>
-#include <jni.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#define HADOOP_HDFS_CONF "org/apache/hadoop/hdfs/HdfsConfiguration"
-#define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode"
-#define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress"
+#define HADOOP_HDFS_CONF "org/apache/hadoop/hdfs/HdfsConfiguration"
+#define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode"
+#define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress"
struct hdfsBuilder {
int forceNewInstance;
@@ -65,30 +63,70 @@ struct hdfs_internal {
*/
struct hdfsFile_internal {
struct webhdfsFileHandle* file;
- enum hdfsStreamType type;
- int flags;
- tOffset offset;
+ enum hdfsStreamType type; /* INPUT or OUTPUT */
+ int flags; /* Flag indicate read/create/append etc. */
+ tOffset offset; /* Current offset position in the file */
};
-static webhdfsBuffer *initWebHdfsBuffer(void)
+/**
+ * Create, initialize and return a webhdfsBuffer
+ */
+static int initWebHdfsBuffer(struct webhdfsBuffer **webhdfsBuffer)
{
- webhdfsBuffer *buffer = calloc(1, sizeof(*buffer));
+ int ret = 0;
+ struct webhdfsBuffer *buffer = calloc(1, sizeof(struct webhdfsBuffer));
if (!buffer) {
- fprintf(stderr, "Fail to allocate memory for webhdfsBuffer.\n");
- return NULL;
+ fprintf(stderr,
+ "ERROR: fail to allocate memory for webhdfsBuffer.\n");
+ return ENOMEM;
}
- buffer->remaining = 0;
- buffer->offset = 0;
- buffer->wbuffer = NULL;
- buffer->closeFlag = 0;
- buffer->openFlag = 0;
- pthread_mutex_init(&buffer->writeMutex, NULL);
- pthread_cond_init(&buffer->newwrite_or_close, NULL);
- pthread_cond_init(&buffer->transfer_finish, NULL);
- return buffer;
+ ret = pthread_mutex_init(&buffer->writeMutex, NULL);
+ if (ret) {
+ fprintf(stderr, "ERROR: fail in pthread_mutex_init for writeMutex "
+ "in initWebHdfsBuffer, <%d>: %s.\n",
+ ret, hdfs_strerror(ret));
+ goto done;
+ }
+ ret = pthread_cond_init(&buffer->newwrite_or_close, NULL);
+ if (ret) {
+ fprintf(stderr,
+ "ERROR: fail in pthread_cond_init for newwrite_or_close "
+ "in initWebHdfsBuffer, <%d>: %s.\n",
+ ret, hdfs_strerror(ret));
+ goto done;
+ }
+ ret = pthread_cond_init(&buffer->transfer_finish, NULL);
+ if (ret) {
+ fprintf(stderr,
+ "ERROR: fail in pthread_cond_init for transfer_finish "
+ "in initWebHdfsBuffer, <%d>: %s.\n",
+ ret, hdfs_strerror(ret));
+ goto done;
+ }
+
+done:
+ if (ret) {
+ free(buffer);
+ return ret;
+ }
+ *webhdfsBuffer = buffer;
+ return 0;
}
-static webhdfsBuffer *resetWebhdfsBuffer(webhdfsBuffer *wb, const char *buffer, size_t length) {
+/**
+ * Reset the webhdfsBuffer. This is used in a block way
+ * when hdfsWrite is called with a new buffer to write.
+ * The writing thread in libcurl will be waken up to continue writing,
+ * and the caller of this function is blocked waiting for writing to finish.
+ *
+ * @param wb The handle of the webhdfsBuffer
+ * @param buffer The buffer provided by user to write
+ * @param length The length of bytes to write
+ * @return Updated webhdfsBuffer.
+ */
+static struct webhdfsBuffer *resetWebhdfsBuffer(struct webhdfsBuffer *wb,
+ const char *buffer, size_t length)
+{
if (buffer && length > 0) {
pthread_mutex_lock(&wb->writeMutex);
wb->wbuffer = buffer;
@@ -103,35 +141,49 @@ static webhdfsBuffer *resetWebhdfsBuffer
return wb;
}
-static void freeWebhdfsBuffer(webhdfsBuffer *buffer) {
+/**
+ * Free the webhdfsBuffer and destroy its pthread conditions/mutex
+ * @param buffer The webhdfsBuffer to free
+ */
+static void freeWebhdfsBuffer(struct webhdfsBuffer *buffer)
+{
+ int ret = 0;
if (buffer) {
- int des = pthread_cond_destroy(&buffer->newwrite_or_close);
- if (des == EBUSY) {
- fprintf(stderr, "The condition newwrite_or_close is still referenced!\n");
- } else if (des == EINVAL) {
- fprintf(stderr, "The condition newwrite_or_close is invalid!\n");
- }
- des = pthread_cond_destroy(&buffer->transfer_finish);
- if (des == EBUSY) {
- fprintf(stderr, "The condition transfer_finish is still referenced!\n");
- } else if (des == EINVAL) {
- fprintf(stderr, "The condition transfer_finish is invalid!\n");
+ ret = pthread_cond_destroy(&buffer->newwrite_or_close);
+ if (ret) {
+ fprintf(stderr,
+ "WARN: fail in pthread_cond_destroy for newwrite_or_close "
+ "in freeWebhdfsBuffer, <%d>: %s.\n",
+ ret, hdfs_strerror(ret));
+ errno = ret;
}
- if (des == EBUSY) {
- fprintf(stderr, "The condition close_clean is still referenced!\n");
- } else if (des == EINVAL) {
- fprintf(stderr, "The condition close_clean is invalid!\n");
+ ret = pthread_cond_destroy(&buffer->transfer_finish);
+ if (ret) {
+ fprintf(stderr,
+ "WARN: fail in pthread_cond_destroy for transfer_finish "
+ "in freeWebhdfsBuffer, <%d>: %s.\n",
+ ret, hdfs_strerror(ret));
+ errno = ret;
}
- des = pthread_mutex_destroy(&buffer->writeMutex);
- if (des == EBUSY) {
- fprintf(stderr, "The mutex is still locked or referenced!\n");
+ ret = pthread_mutex_destroy(&buffer->writeMutex);
+ if (ret) {
+ fprintf(stderr,
+ "WARN: fail in pthread_mutex_destroy for writeMutex "
+ "in freeWebhdfsBuffer, <%d>: %s.\n",
+ ret, hdfs_strerror(ret));
+ errno = ret;
}
free(buffer);
buffer = NULL;
}
}
-static void freeWebFileHandle(struct webhdfsFileHandle * handle) {
+/**
+ * To free the webhdfsFileHandle, which includes a webhdfsBuffer and strings
+ * @param handle The webhdfsFileHandle to free
+ */
+static void freeWebFileHandle(struct webhdfsFileHandle * handle)
+{
if (!handle)
return;
freeWebhdfsBuffer(handle->uploadBuffer);
@@ -140,11 +192,46 @@ static void freeWebFileHandle(struct web
free(handle);
}
+static const char *maybeNull(const char *str)
+{
+ return str ? str : "(NULL)";
+}
+
+/** To print a hdfsBuilder as string */
+static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
+ char *buf, size_t bufLen)
+{
+ int strlength = snprintf(buf, bufLen, "nn=%s, port=%d, "
+ "kerbTicketCachePath=%s, userName=%s",
+ maybeNull(bld->nn), bld->port,
+ maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName));
+ if (strlength < 0 || strlength >= bufLen) {
+ fprintf(stderr, "failed to print a hdfsBuilder as string.\n");
+ return NULL;
+ }
+ return buf;
+}
+
+/**
+ * Free a hdfs_internal handle
+ * @param fs The hdfs_internal handle to free
+ */
+static void freeWebHdfsInternal(struct hdfs_internal *fs)
+{
+ if (fs) {
+ free(fs->nn);
+ free(fs->userName);
+ free(fs->workingDir);
+ }
+}
+
struct hdfsBuilder *hdfsNewBuilder(void)
{
struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder));
- if (!bld)
+ if (!bld) {
+ errno = ENOMEM;
return NULL;
+ }
return bld;
}
@@ -206,12 +293,7 @@ hdfsFS hdfsConnect(const char* nn, tPort
hdfsFS hdfsConnectNewInstance(const char* nn, tPort port)
{
- struct hdfsBuilder* bld = (struct hdfsBuilder *) hdfsConnect(nn, port);
- if (!bld) {
- return NULL;
- }
- hdfsBuilderSetForceNewInstance(bld);
- return hdfsBuilderConnect(bld);
+ return hdfsConnect(nn, port);
}
hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port,
@@ -227,30 +309,16 @@ hdfsFS hdfsConnectAsUserNewInstance(cons
return hdfsBuilderConnect(bld);
}
-static const char *maybeNull(const char *str)
-{
- return str ? str : "(NULL)";
-}
-
-static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
- char *buf, size_t bufLen)
-{
- snprintf(buf, bufLen, "nn=%s, port=%d, "
- "kerbTicketCachePath=%s, userName=%s",
- maybeNull(bld->nn), bld->port,
- maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName));
- return buf;
-}
-
-static void freeWebHdfsInternal(struct hdfs_internal *fs)
-{
- if (fs) {
- free(fs->nn);
- free(fs->userName);
- free(fs->workingDir);
- }
-}
-
+/**
+ * To retrieve the default configuration value for NameNode's hostName and port
+ * TODO: This function currently is using JNI,
+ * we need to do this without using JNI (HDFS-3917)
+ *
+ * @param bld The hdfsBuilder handle
+ * @param port Used to get the default value for NameNode's port
+ * @param nn Used to get the default value for NameNode's hostName
+ * @return 0 for success and non-zero value for failure
+ */
static int retrieveDefaults(const struct hdfsBuilder *bld, tPort *port,
char **nn)
{
@@ -262,13 +330,11 @@ static int retrieveDefaults(const struct
int ret = 0;
char buf[512];
- // TODO: can we do this without using JNI? See HDFS-3917
env = getJNIEnv();
if (!env) {
return EINTERNAL;
}
- // jHDFSConf = new HDFSConfiguration();
jthr = constructNewObjectOfClass(env, &jHDFSConf, HADOOP_HDFS_CONF, "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
@@ -277,12 +343,14 @@ static int retrieveDefaults(const struct
goto done;
}
- jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_NAMENODE, "getHttpAddress",
- "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;",
- jHDFSConf);
+ jthr = invokeMethod(env, &jVal, STATIC, NULL,
+ HADOOP_NAMENODE, "getHttpAddress",
+ "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;",
+ jHDFSConf);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ "hdfsBuilderConnect(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jAddress = jVal.l;
@@ -298,7 +366,8 @@ static int retrieveDefaults(const struct
*port = jVal.i;
jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
- JAVA_INETSOCKETADDRESS, "getHostName", "()Ljava/lang/String;");
+ JAVA_INETSOCKETADDRESS,
+ "getHostName", "()Ljava/lang/String;");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
@@ -324,7 +393,7 @@ done:
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
{
struct hdfs_internal *fs = NULL;
- int ret;
+ int ret = 0;
if (!bld) {
ret = EINVAL;
@@ -341,8 +410,8 @@ hdfsFS hdfsBuilderConnect(struct hdfsBui
ret = ENOMEM;
goto done;
}
- /* If the namenode is "default" and/or the port of namenode is 0, get the
- * default namenode/port */
+ // If the namenode is "default" and/or the port of namenode is 0,
+ // get the default namenode/port
if (bld->port == 0 || !strcasecmp("default", bld->nn)) {
ret = retrieveDefaults(bld, &fs->port, &fs->nn);
if (ret)
@@ -369,7 +438,7 @@ hdfsFS hdfsBuilderConnect(struct hdfsBui
ret = ENOMEM;
goto done;
}
- //for debug
+ // For debug
fprintf(stderr, "namenode: %s:%d\n", bld->nn, bld->port);
done:
@@ -392,47 +461,68 @@ int hdfsDisconnect(hdfsFS fs)
return 0;
}
-static char *getAbsolutePath(hdfsFS fs, const char *path)
+/**
+ * Based on the working directory stored in hdfsFS,
+ * generate the absolute path for the given path
+ *
+ * @param fs The hdfsFS handle which stores the current working directory
+ * @param path The given path which may not be an absolute path
+ * @param absPath To hold generated absolute path for the given path
+ * @return 0 on success, non-zero value indicating error
+ */
+static int getAbsolutePath(hdfsFS fs, const char *path, char **absPath)
{
- char *absPath = NULL;
+ char *tempPath = NULL;
size_t absPathLen;
+ int strlength;
if (path[0] == '/') {
- // path is already absolute.
- return strdup(path);
+ // Path is already absolute.
+ tempPath = strdup(path);
+ if (!tempPath) {
+ return ENOMEM;
+ }
+ *absPath = tempPath;
+ return 0;
}
- // prepend the workingDir to the path.
- absPathLen = strlen(fs->workingDir) + strlen(path);
- absPath = malloc(absPathLen + 1);
- if (!absPath) {
- return NULL;
+ // Prepend the workingDir to the path.
+ absPathLen = strlen(fs->workingDir) + strlen(path) + 1;
+ tempPath = malloc(absPathLen);
+ if (!tempPath) {
+ return ENOMEM;
+ }
+ strlength = snprintf(tempPath, absPathLen, "%s%s", fs->workingDir, path);
+ if (strlength < 0 || strlength >= absPathLen) {
+ free(tempPath);
+ return EIO;
}
- snprintf(absPath, absPathLen + 1, "%s%s", fs->workingDir, path);
- return absPath;
+ *absPath = tempPath;
+ return 0;
}
int hdfsCreateDirectory(hdfsFS fs, const char* path)
{
char *url = NULL, *absPath = NULL;
- Response resp = NULL;
+ struct Response *resp = NULL;
int ret = 0;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
- absPath = getAbsolutePath(fs, path);
- if (!absPath) {
- ret = ENOMEM;
+ ret = getAbsolutePath(fs, path, &absPath);
+ if (ret) {
goto done;
}
- if(!((url = prepareMKDIR(fs->nn, fs->port, absPath, fs->userName))
- && (resp = launchMKDIR(url))
- && (parseMKDIR(resp->body->content)))) {
- ret = EIO;
+ ret = createUrlForMKDIR(fs->nn, fs->port, absPath, fs->userName, &url);
+ if (ret) {
goto done;
}
-
+ ret = launchMKDIR(url, &resp);
+ if (ret) {
+ goto done;
+ }
+ ret = parseMKDIR(resp->body->content);
done:
freeResponse(resp);
free(url);
@@ -447,24 +537,27 @@ done:
int hdfsChmod(hdfsFS fs, const char* path, short mode)
{
char *absPath = NULL, *url = NULL;
- Response resp = NULL;
+ struct Response *resp = NULL;
int ret = 0;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
- absPath = getAbsolutePath(fs, path);
- if (!absPath) {
- ret = ENOMEM;
+ ret = getAbsolutePath(fs, path, &absPath);
+ if (ret) {
goto done;
}
- if(!((url = prepareCHMOD(fs->nn, fs->port, absPath, (int)mode, fs->userName))
- && (resp = launchCHMOD(url))
- && (parseCHMOD(resp->header->content, resp->body->content)))) {
- ret = EIO;
+ ret = createUrlForCHMOD(fs->nn, fs->port, absPath, (int) mode,
+ fs->userName, &url);
+ if (ret) {
goto done;
}
+ ret = launchCHMOD(url, &resp);
+ if (ret) {
+ goto done;
+ }
+ ret = parseCHMOD(resp->header->content, resp->body->content);
done:
freeResponse(resp);
free(absPath);
@@ -480,26 +573,27 @@ int hdfsChown(hdfsFS fs, const char* pat
{
int ret = 0;
char *absPath = NULL, *url = NULL;
- Response resp = NULL;
+ struct Response *resp = NULL;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
- absPath = getAbsolutePath(fs, path);
- if (!absPath) {
- ret = ENOMEM;
+ ret = getAbsolutePath(fs, path, &absPath);
+ if (ret) {
goto done;
}
-
- if(!((url = prepareCHOWN(fs->nn, fs->port, absPath, owner, group, fs->userName))
- && (resp = launchCHOWN(url))
- && (parseCHOWN(resp->header->content, resp->body->content)))) {
- ret = EIO;
+ ret = createUrlForCHOWN(fs->nn, fs->port, absPath,
+ owner, group, fs->userName, &url);
+ if (ret) {
goto done;
}
-
+ ret = launchCHOWN(url, &resp);
+ if (ret) {
+ goto done;
+ }
+ ret = parseCHOWN(resp->header->content, resp->body->content);
done:
freeResponse(resp);
free(absPath);
@@ -515,27 +609,30 @@ int hdfsRename(hdfsFS fs, const char* ol
{
char *oldAbsPath = NULL, *newAbsPath = NULL, *url = NULL;
int ret = 0;
- Response resp = NULL;
+ struct Response *resp = NULL;
if (fs == NULL || oldPath == NULL || newPath == NULL) {
ret = EINVAL;
goto done;
}
- oldAbsPath = getAbsolutePath(fs, oldPath);
- if (!oldAbsPath) {
- ret = ENOMEM;
+ ret = getAbsolutePath(fs, oldPath, &oldAbsPath);
+ if (ret) {
goto done;
}
- newAbsPath = getAbsolutePath(fs, newPath);
- if (!newAbsPath) {
- ret = ENOMEM;
+ ret = getAbsolutePath(fs, newPath, &newAbsPath);
+ if (ret) {
goto done;
}
- if(!((url = prepareRENAME(fs->nn, fs->port, oldAbsPath, newAbsPath, fs->userName))
- && (resp = launchRENAME(url))
- && (parseRENAME(resp->body->content)))) {
- ret = -1;
+ ret = createUrlForRENAME(fs->nn, fs->port, oldAbsPath,
+ newAbsPath, fs->userName, &url);
+ if (ret) {
+ goto done;
}
+ ret = launchRENAME(url, &resp);
+ if (ret) {
+ goto done;
+ }
+ ret = parseRENAME(resp->body->content);
done:
freeResponse(resp);
free(oldAbsPath);
@@ -548,12 +645,22 @@ done:
return 0;
}
-hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path)
+/**
+ * Get the file status for a given path.
+ *
+ * @param fs hdfsFS handle containing
+ * NameNode hostName/port information
+ * @param path Path for file
+ * @param printError Whether or not to print out error information
+ * (mainly remote FileNotFoundException)
+ * @return File information for the given path
+ */
+static hdfsFileInfo *hdfsGetPathInfoImpl(hdfsFS fs, const char* path,
+ int printError)
{
char *absPath = NULL;
char *url=NULL;
- Response resp = NULL;
- int numEntries = 0;
+ struct Response *resp = NULL;
int ret = 0;
hdfsFileInfo *fileInfo = NULL;
@@ -561,9 +668,8 @@ hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs,
ret = EINVAL;
goto done;
}
- absPath = getAbsolutePath(fs, path);
- if (!absPath) {
- ret = ENOMEM;
+ ret = getAbsolutePath(fs, path, &absPath);
+ if (ret) {
goto done;
}
fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo));
@@ -573,18 +679,21 @@ hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs,
}
fileInfo->mKind = kObjectKindFile;
- if(!((url = prepareGFS(fs->nn, fs->port, absPath, fs->userName))
- && (resp = launchGFS(url))
- && (fileInfo = parseGFS(resp->body->content, fileInfo, &numEntries)))) {
- ret = EIO;
+ ret = createUrlForGetFileStatus(fs->nn, fs->port, absPath,
+ fs->userName, &url);
+ if (ret) {
goto done;
}
+ ret = launchGFS(url, &resp);
+ if (ret) {
+ goto done;
+ }
+ ret = parseGFS(resp->body->content, fileInfo, printError);
done:
freeResponse(resp);
free(absPath);
free(url);
-
if (ret == 0) {
return fileInfo;
} else {
@@ -594,10 +703,15 @@ done:
}
}
+hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path)
+{
+ return hdfsGetPathInfoImpl(fs, path, 1);
+}
+
hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries)
{
char *url = NULL, *absPath = NULL;
- Response resp = NULL;
+ struct Response *resp = NULL;
int ret = 0;
hdfsFileInfo *fileInfo = NULL;
@@ -605,9 +719,8 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS f
ret = EINVAL;
goto done;
}
- absPath = getAbsolutePath(fs, path);
- if (!absPath) {
- ret = ENOMEM;
+ ret = getAbsolutePath(fs, path, &absPath);
+ if (ret) {
goto done;
}
fileInfo = calloc(1, sizeof(*fileInfo));
@@ -615,21 +728,24 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS f
ret = ENOMEM;
goto done;
}
- if(!((url = prepareLS(fs->nn, fs->port, absPath, fs->userName))
- && (resp = launchLS(url))
- && (fileInfo = parseGFS(resp->body->content, fileInfo, numEntries)))) {
- ret = EIO;
+
+ ret = createUrlForLS(fs->nn, fs->port, absPath, fs->userName, &url);
+ if (ret) {
+ goto done;
+ }
+ ret = launchLS(url, &resp);
+ if (ret) {
goto done;
}
+ ret = parseLS(resp->body->content, &fileInfo, numEntries);
+
done:
freeResponse(resp);
free(absPath);
free(url);
-
if (ret == 0) {
return fileInfo;
} else {
- hdfsFreeFileInfo(fileInfo, 1);
errno = ret;
return NULL;
}
@@ -638,24 +754,28 @@ done:
int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication)
{
char *url = NULL, *absPath = NULL;
- Response resp = NULL;
+ struct Response *resp = NULL;
int ret = 0;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
- absPath = getAbsolutePath(fs, path);
- if (!absPath) {
- ret = ENOMEM;
+ ret = getAbsolutePath(fs, path, &absPath);
+ if (ret) {
goto done;
}
- if(!((url = prepareSETREPLICATION(fs->nn, fs->port, absPath, replication, fs->userName))
- && (resp = launchSETREPLICATION(url))
- && (parseSETREPLICATION(resp->body->content)))) {
- ret = EIO;
+
+ ret = createUrlForSETREPLICATION(fs->nn, fs->port, absPath,
+ replication, fs->userName, &url);
+ if (ret) {
goto done;
}
+ ret = launchSETREPLICATION(url, &resp);
+ if (ret) {
+ goto done;
+ }
+ ret = parseSETREPLICATION(resp->body->content);
done:
freeResponse(resp);
free(absPath);
@@ -670,8 +790,7 @@ done:
void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
{
int i;
-
- for (i=0; i < numEntries; ++i) {
+ for (i = 0; i < numEntries; ++i) {
free(hdfsFileInfo[i].mName);
free(hdfsFileInfo[i].mOwner);
free(hdfsFileInfo[i].mGroup);
@@ -682,25 +801,28 @@ void hdfsFreeFileInfo(hdfsFileInfo *hdfs
int hdfsDelete(hdfsFS fs, const char* path, int recursive)
{
char *url = NULL, *absPath = NULL;
- Response resp = NULL;
+ struct Response *resp = NULL;
int ret = 0;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
- absPath = getAbsolutePath(fs, path);
- if (!absPath) {
- ret = ENOMEM;
+ ret = getAbsolutePath(fs, path, &absPath);
+ if (ret) {
goto done;
}
- if(!((url = prepareDELETE(fs->nn, fs->port, absPath, recursive, fs->userName))
- && (resp = launchDELETE(url))
- && (parseDELETE(resp->body->content)))) {
- ret = EIO;
+
+ ret = createUrlForDELETE(fs->nn, fs->port, absPath,
+ recursive, fs->userName, &url);
+ if (ret) {
goto done;
}
-
+ ret = launchDELETE(url, &resp);
+ if (ret) {
+ goto done;
+ }
+ ret = parseDELETE(resp->body->content);
done:
freeResponse(resp);
free(absPath);
@@ -715,26 +837,28 @@ done:
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime)
{
char *url = NULL, *absPath = NULL;
- Response resp = NULL;
+ struct Response *resp = NULL;
int ret = 0;
if (fs == NULL || path == NULL) {
ret = EINVAL;
goto done;
}
- absPath = getAbsolutePath(fs, path);
- if (!absPath) {
- ret = ENOMEM;
+ ret = getAbsolutePath(fs, path, &absPath);
+ if (ret) {
+ goto done;
+ }
+
+ ret = createUrlForUTIMES(fs->nn, fs->port, absPath, mtime, atime,
+ fs->userName, &url);
+ if (ret) {
goto done;
}
- if(!((url = prepareUTIMES(fs->nn, fs->port, absPath, mtime, atime,
- fs->userName))
- && (resp = launchUTIMES(url))
- && (parseUTIMES(resp->header->content, resp->body->content)))) {
- ret = EIO;
+ ret = launchUTIMES(url, &resp);
+ if (ret) {
goto done;
}
-
+ ret = parseUTIMES(resp->header->content, resp->body->content);
done:
freeResponse(resp);
free(absPath);
@@ -748,7 +872,7 @@ done:
int hdfsExists(hdfsFS fs, const char *path)
{
- hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, path);
+ hdfsFileInfo *fileInfo = hdfsGetPathInfoImpl(fs, path, 0);
if (!fileInfo) {
// (errno will have been set by hdfsGetPathInfo)
return -1;
@@ -757,14 +881,23 @@ int hdfsExists(hdfsFS fs, const char *pa
return 0;
}
+/**
+ * The information hold by the thread which writes data to hdfs through http
+ */
typedef struct {
- char *url;
- webhdfsBuffer *uploadBuffer;
- int flags;
- Response resp;
+ char *url; /* the url of the target datanode for writing*/
+ struct webhdfsBuffer *uploadBuffer; /* buffer storing data to write */
+ int flags; /* flag indicating writing mode: create or append */
+ struct Response *resp; /* response from the target datanode */
} threadData;
-static void freeThreadData(threadData *data) {
+/**
+ * Free the threadData struct instance,
+ * including the response and url contained in it
+ * @param data The threadData instance to free
+ */
+static void freeThreadData(threadData *data)
+{
if (data) {
if (data->url) {
free(data->url);
@@ -772,18 +905,29 @@ static void freeThreadData(threadData *d
if (data->resp) {
freeResponse(data->resp);
}
- //the uploadBuffer would be freed by freeWebFileHandle()
+ // The uploadBuffer would be freed by freeWebFileHandle()
free(data);
data = NULL;
}
}
-static void *writeThreadOperation(void *v) {
- threadData *data = (threadData *) v;
+/**
+ * The action of the thread that writes data to
+ * the target datanode for hdfsWrite.
+ * The writing can be either create or append, which is specified by flag
+ */
+static void *writeThreadOperation(void *v)
+{
+ int ret = 0;
+ threadData *data = v;
if (data->flags & O_APPEND) {
- data->resp = launchDnAPPEND(data->url, data->uploadBuffer);
+ ret = launchDnAPPEND(data->url, data->uploadBuffer, &(data->resp));
} else {
- data->resp = launchDnWRITE(data->url, data->uploadBuffer);
+ ret = launchDnWRITE(data->url, data->uploadBuffer, &(data->resp));
+ }
+ if (ret) {
+ fprintf(stderr, "Failed to write to datanode %s, <%d>: %s.\n",
+ data->url, ret, hdfs_strerror(ret));
}
return data;
}
@@ -816,58 +960,58 @@ static void freeFileInternal(hdfsFile fi
static int hdfsOpenOutputFileImpl(hdfsFS fs, hdfsFile file)
{
struct webhdfsFileHandle *webhandle = file->file;
- Response resp = NULL;
- int parseRet, append, ret = 0;
- char *prepareUrl = NULL, *dnUrl = NULL;
+ struct Response *resp = NULL;
+ int append, ret = 0;
+ char *nnUrl = NULL, *dnUrl = NULL;
threadData *data = NULL;
- webhandle->uploadBuffer = initWebHdfsBuffer();
- if (!webhandle->uploadBuffer) {
- ret = ENOMEM;
+ ret = initWebHdfsBuffer(&webhandle->uploadBuffer);
+ if (ret) {
goto done;
}
append = file->flags & O_APPEND;
if (!append) {
// If we're not appending, send a create request to the NN
- prepareUrl = prepareNnWRITE(fs->nn, fs->port, webhandle->absPath,
- fs->userName, webhandle->replication, webhandle->blockSize);
+ ret = createUrlForNnWRITE(fs->nn, fs->port, webhandle->absPath,
+ fs->userName, webhandle->replication,
+ webhandle->blockSize, &nnUrl);
} else {
- prepareUrl = prepareNnAPPEND(fs->nn, fs->port, webhandle->absPath,
- fs->userName);
+ ret = createUrlForNnAPPEND(fs->nn, fs->port, webhandle->absPath,
+ fs->userName, &nnUrl);
}
- if (!prepareUrl) {
- fprintf(stderr, "fail to create the url connecting to namenode "
- "for file creation/appending\n");
- ret = EIO;
+ if (ret) {
+ fprintf(stderr, "Failed to create the url connecting to namenode "
+ "for file creation/appending, <%d>: %s.\n",
+ ret, hdfs_strerror(ret));
goto done;
}
if (!append) {
- resp = launchNnWRITE(prepareUrl);
+ ret = launchNnWRITE(nnUrl, &resp);
} else {
- resp = launchNnAPPEND(prepareUrl);
+ ret = launchNnAPPEND(nnUrl, &resp);
}
- if (!resp) {
+ if (ret) {
fprintf(stderr, "fail to get the response from namenode for "
- "file creation/appending\n");
- ret = EIO;
+ "file creation/appending, <%d>: %s.\n",
+ ret, hdfs_strerror(ret));
goto done;
}
if (!append) {
- parseRet = parseNnWRITE(resp->header->content, resp->body->content);
+ ret = parseNnWRITE(resp->header->content, resp->body->content);
} else {
- parseRet = parseNnAPPEND(resp->header->content, resp->body->content);
+ ret = parseNnAPPEND(resp->header->content, resp->body->content);
}
- if (!parseRet) {
+ if (ret) {
fprintf(stderr, "fail to parse the response from namenode for "
- "file creation/appending\n");
- ret = EIO;
+ "file creation/appending, <%d>: %s.\n",
+ ret, hdfs_strerror(ret));
goto done;
}
- dnUrl = parseDnLoc(resp->header->content);
- if (!dnUrl) {
+ ret = parseDnLoc(resp->header->content, &dnUrl);
+ if (ret) {
fprintf(stderr, "fail to get the datanode url from namenode "
- "for file creation/appending\n");
- ret = EIO;
+ "for file creation/appending, <%d>: %s.\n",
+ ret, hdfs_strerror(ret));
goto done;
}
//store the datanode url in the file handle
@@ -892,18 +1036,23 @@ static int hdfsOpenOutputFileImpl(hdfsFS
ret = pthread_create(&webhandle->connThread, NULL,
writeThreadOperation, data);
if (ret) {
- fprintf(stderr, "Failed to create the writing thread.\n");
+ fprintf(stderr, "ERROR: failed to create the writing thread "
+ "in hdfsOpenOutputFileImpl, <%d>: %s.\n",
+ ret, hdfs_strerror(ret));
goto done;
}
webhandle->uploadBuffer->openFlag = 1;
done:
freeResponse(resp);
- free(prepareUrl);
+ free(nnUrl);
free(dnUrl);
if (ret) {
- free(data->url);
- free(data);
+ errno = ret;
+ if (data) {
+ free(data->url);
+ free(data);
+ }
}
return ret;
}
@@ -929,7 +1078,8 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const c
goto done;
}
if ((flags & O_CREAT) && (flags & O_EXCL)) {
- fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
+ fprintf(stderr,
+ "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
}
file = calloc(1, sizeof(struct hdfsFile_internal));
if (!file) {
@@ -947,12 +1097,13 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const c
webhandle->bufferSize = bufferSize;
webhandle->replication = replication;
webhandle->blockSize = blockSize;
- webhandle->absPath = getAbsolutePath(fs, path);
- if (!webhandle->absPath) {
- ret = ENOMEM;
+ ret = getAbsolutePath(fs, path, &webhandle->absPath);
+ if (ret) {
goto done;
}
file->file = webhandle;
+ // If open for write/append,
+ // open and keep the connection with the target datanode for writing
if (file->type == OUTPUT) {
ret = hdfsOpenOutputFileImpl(fs, file);
if (ret) {
@@ -988,7 +1139,9 @@ tSize hdfsWrite(hdfsFS fs, hdfsFile file
resetWebhdfsBuffer(wfile->uploadBuffer, buffer, length);
return length;
} else {
- fprintf(stderr, "Error: have not opened the file %s for writing yet.\n", wfile->absPath);
+ fprintf(stderr,
+ "Error: have not opened the file %s for writing yet.\n",
+ wfile->absPath);
errno = EBADF;
return -1;
}
@@ -996,42 +1149,47 @@ tSize hdfsWrite(hdfsFS fs, hdfsFile file
int hdfsCloseFile(hdfsFS fs, hdfsFile file)
{
+ void *respv = NULL;
+ threadData *tdata = NULL;
int ret = 0;
- fprintf(stderr, "to close file...\n");
+ struct webhdfsFileHandle *wfile = NULL;
+
if (file->type == OUTPUT) {
- void *respv;
- threadData *tdata;
- struct webhdfsFileHandle *wfile = file->file;
+ wfile = file->file;
pthread_mutex_lock(&(wfile->uploadBuffer->writeMutex));
wfile->uploadBuffer->closeFlag = 1;
pthread_cond_signal(&wfile->uploadBuffer->newwrite_or_close);
pthread_mutex_unlock(&(wfile->uploadBuffer->writeMutex));
- //waiting for the writing thread to terminate
+ // Waiting for the writing thread to terminate
ret = pthread_join(wfile->connThread, &respv);
if (ret) {
- fprintf(stderr, "Error (code %d) when pthread_join.\n", ret);
+ fprintf(stderr, "Error when pthread_join in hdfsClose, <%d>: %s.\n",
+ ret, hdfs_strerror(ret));
}
- //parse the response
- tdata = (threadData *) respv;
- if (!tdata) {
- fprintf(stderr, "Response from the writing thread is NULL.\n");
- ret = -1;
+ // Parse the response
+ tdata = respv;
+ if (!tdata || !(tdata->resp)) {
+ fprintf(stderr,
+ "ERROR: response from the writing thread is NULL.\n");
+ ret = EIO;
}
if (file->flags & O_APPEND) {
- parseDnAPPEND(tdata->resp->header->content, tdata->resp->body->content);
+ ret = parseDnAPPEND(tdata->resp->header->content,
+ tdata->resp->body->content);
} else {
- parseDnWRITE(tdata->resp->header->content, tdata->resp->body->content);
+ ret = parseDnWRITE(tdata->resp->header->content,
+ tdata->resp->body->content);
}
- //free the threaddata
+ // Free the threaddata
freeThreadData(tdata);
}
freeFileInternal(file);
- fprintf(stderr, "Closed the webfilehandle...\n");
if (ret) {
- errno = EIO;
+ errno = ret;
+ return -1;
}
- return ret;
+ return 0;
}
int hdfsFileIsOpenForRead(hdfsFile file)
@@ -1049,8 +1207,7 @@ static int hdfsReadImpl(hdfsFS fs, hdfsF
{
int ret = 0;
char *url = NULL;
- Response resp = NULL;
- int openResult = -1;
+ struct Response *resp = NULL;
if (fs == NULL || file == NULL || file->type != INPUT || buffer == NULL ||
length < 0) {
@@ -1068,30 +1225,41 @@ static int hdfsReadImpl(hdfsFS fs, hdfsF
ret = ENOMEM;
goto done;
}
- resp->header = initResponseBuffer();
- resp->body = initResponseBuffer();
+ ret = initResponseBuffer(&(resp->header));
+ if (ret) {
+ goto done;
+ }
+ ret = initResponseBuffer(&(resp->body));
+ if (ret) {
+ goto done;
+ }
+ memset(buffer, 0, length);
resp->body->content = buffer;
resp->body->remaining = length;
- if (!((url = prepareOPEN(fs->nn, fs->port, file->file->absPath,
- fs->userName, off, length))
- && (resp = launchOPEN(url, resp))
- && ((openResult = parseOPEN(resp->header->content, resp->body->content)) > 0))) {
- if (openResult == 0) {
- // Special case: if parseOPEN returns 0, we asked for a byte range
- // with outside what the file contains. In this case, hdfsRead and
- // hdfsPread return 0, meaning end-of-file.
- *numRead = 0;
- goto done;
- }
- ret = EIO;
+ ret = createUrlForOPEN(fs->nn, fs->port, file->file->absPath,
+ fs->userName, off, length, &url);
+ if (ret) {
goto done;
}
- *numRead = resp->body->offset;
-
+ ret = launchOPEN(url, resp);
+ if (ret) {
+ goto done;
+ }
+ ret = parseOPEN(resp->header->content, resp->body->content);
+ if (ret == -1) {
+ // Special case: if parseOPEN returns -1, we asked for a byte range
+ // with outside what the file contains. In this case, hdfsRead and
+ // hdfsPread return 0, meaning end-of-file.
+ *numRead = 0;
+ } else if (ret == 0) {
+ *numRead = (tSize) resp->body->offset;
+ }
done:
- freeResponseBuffer(resp->header);
- free(resp->body);
+ if (resp) {
+ freeResponseBuffer(resp->header);
+ free(resp->body);
+ }
free(resp);
free(url);
return ret;
@@ -1099,11 +1267,12 @@ done:
tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length)
{
- int ret;
+ int ret = 0;
tSize numRead = 0;
- ret = hdfsReadImpl(fs, file, buffer, file->offset, length, &numRead);
- if (ret) {
+ ret = hdfsReadImpl(fs, file, buffer, (tSize) file->offset,
+ length, &numRead);
+ if (ret > 0) { // ret == -1 means end of file
errno = ret;
return -1;
}
@@ -1119,18 +1288,6 @@ int hdfsAvailable(hdfsFS fs, hdfsFile fi
return 0;
}
-int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
-{
- errno = ENOTSUP;
- return -1;
-}
-
-int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
-{
- errno = ENOTSUP;
- return -1;
-}
-
int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos)
{
struct webhdfsFileHandle *wf;
@@ -1172,7 +1329,8 @@ done:
return 0;
}
-tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length)
+tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position,
+ void* buffer, tSize length)
{
int ret;
tSize numRead = 0;
@@ -1181,8 +1339,8 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file
errno = EINVAL;
return -1;
}
- ret = hdfsReadImpl(fs, file, buffer, position, length, &numRead);
- if (ret) {
+ ret = hdfsReadImpl(fs, file, buffer, (tSize) position, length, &numRead);
+ if (ret > 0) {
errno = ret;
return -1;
}
@@ -1200,21 +1358,44 @@ tOffset hdfsTell(hdfsFS fs, hdfsFile fil
char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize)
{
+ int strlength;
if (fs == NULL || buffer == NULL || bufferSize <= 0) {
errno = EINVAL;
return NULL;
}
- if (snprintf(buffer, bufferSize, "%s", fs->workingDir) >= bufferSize) {
+ strlength = snprintf(buffer, bufferSize, "%s", fs->workingDir);
+ if (strlength >= bufferSize) {
errno = ENAMETOOLONG;
return NULL;
+ } else if (strlength < 0) {
+ errno = EIO;
+ return NULL;
}
return buffer;
}
+/** Replace "//" with "/" in path */
+static void normalizePath(char *path)
+{
+ int i = 0, j = 0, sawslash = 0;
+
+ for (i = j = sawslash = 0; path[i] != '\0'; i++) {
+ if (path[i] != '/') {
+ sawslash = 0;
+ path[j++] = path[i];
+ } else if (path[i] == '/' && !sawslash) {
+ sawslash = 1;
+ path[j++] = '/';
+ }
+ }
+ path[j] = '\0';
+}
+
int hdfsSetWorkingDirectory(hdfsFS fs, const char* path)
{
- char *newWorkingDir;
- size_t strlenPath, newWorkingDirLen;
+ char *newWorkingDir = NULL;
+ size_t strlenPath = 0, newWorkingDirLen = 0;
+ int strlength;
if (fs == NULL || path == NULL) {
errno = EINVAL;
@@ -1225,25 +1406,28 @@ int hdfsSetWorkingDirectory(hdfsFS fs, c
errno = EINVAL;
return -1;
}
- if (path[0] != '/') {
- // TODO: support non-absolute paths. They should be interpreted
- // relative to the current path.
- errno = ENOTSUP;
- return -1;
- }
- if (strstr(path, "//")) {
- // TODO: support non-normalized paths (by normalizing them.)
- errno = ENOTSUP;
- return -1;
- }
- newWorkingDirLen = strlenPath + 2;
+ // the max string length of the new working dir is
+ // (length of old working dir) + (length of given path) + strlen("/") + 1
+ newWorkingDirLen = strlen(fs->workingDir) + strlenPath + 2;
newWorkingDir = malloc(newWorkingDirLen);
if (!newWorkingDir) {
errno = ENOMEM;
return -1;
}
- snprintf(newWorkingDir, newWorkingDirLen, "%s%s",
- path, (path[strlenPath - 1] == '/') ? "" : "/");
+ strlength = snprintf(newWorkingDir, newWorkingDirLen, "%s%s%s",
+ (path[0] == '/') ? "" : fs->workingDir,
+ path, (path[strlenPath - 1] == '/') ? "" : "/");
+ if (strlength < 0 || strlength >= newWorkingDirLen) {
+ free(newWorkingDir);
+ errno = EIO;
+ return -1;
+ }
+
+ if (strstr(path, "//")) {
+ // normalize the path by replacing "//" with "/"
+ normalizePath(newWorkingDir);
+ }
+
free(fs->workingDir);
fs->workingDir = newWorkingDir;
return 0;
@@ -1283,7 +1467,7 @@ int hdfsHFlush(hdfsFS fs, hdfsFile file)
errno = EINVAL;
return -1;
}
- // TODO: block until our write buffer is flushed
+ // TODO: block until our write buffer is flushed (HDFS-3952)
return 0;
}
@@ -1293,7 +1477,7 @@ int hdfsFlush(hdfsFS fs, hdfsFile file)
errno = EINVAL;
return -1;
}
- // TODO: block until our write buffer is flushed
+ // TODO: block until our write buffer is flushed (HDFS-3952)
return 0;
}
@@ -1316,3 +1500,15 @@ tOffset hdfsGetUsed(hdfsFS fs)
return -1;
}
+int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c?rev=1403173&r1=1403172&r2=1403173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c Mon Oct 29 05:10:29 2012
@@ -146,6 +146,7 @@ static int hashTableInit(void)
if (hcreate(MAX_HASH_TABLE_ELEM) == 0) {
fprintf(stderr, "error creating hashtable, <%d>: %s\n",
errno, strerror(errno));
+ UNLOCK_HASH_TABLE();
return 0;
}
hashTableInited = 1;