You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2012/10/29 06:10:29 UTC
svn commit: r1403173 [1/2] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/contrib/libwebhdfs/src/ src/main/native/libhdfs/
Author: suresh
Date: Mon Oct 29 05:10:29 2012
New Revision: 1403173
URL: http://svn.apache.org/viewvc?rev=1403173&view=rev
Log:
HDFS-3920. libwebdhfs string processing and using strerror consistently to handle all errors. Contributed by Jing Zhao.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1403173&r1=1403172&r2=1403173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Oct 29 05:10:29 2012
@@ -761,6 +761,9 @@ Release 2.0.2-alpha - 2012-09-07
HDFS-3907. Allow multiple users for local block readers. (eli)
HDFS-3910. DFSTestUtil#waitReplication should timeout. (eli)
+
+ HDFS-3920. libwebdhfs string processing and using strerror consistently
+ to handle all errors. (Jing Zhao via suresh)
OPTIMIZATIONS
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c?rev=1403173&r1=1403172&r2=1403173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c Mon Oct 29 05:10:29 2012
@@ -15,28 +15,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
#include <stdlib.h>
#include <string.h>
#include <curl/curl.h>
-#include <pthread.h>
+
#include "hdfs_http_client.h"
static pthread_mutex_t curlInitMutex = PTHREAD_MUTEX_INITIALIZER;
static volatile int curlGlobalInited = 0;
-ResponseBuffer initResponseBuffer() {
- ResponseBuffer info = (ResponseBuffer) calloc(1, sizeof(ResponseBufferInternal));
+const char *hdfs_strerror(int errnoval)
+{
+ const char *msg = NULL;
+ if (errnoval < 0 || errnoval >= sys_nerr) {
+ msg = "Invalid Error Code";
+ } else if (sys_errlist == NULL) {
+ msg = "Unknown Error";
+ } else {
+ msg = sys_errlist[errnoval];
+ }
+ return msg;
+}
+
+int initResponseBuffer(struct ResponseBuffer **buffer)
+{
+ struct ResponseBuffer *info = NULL;
+ int ret = 0;
+ info = calloc(1, sizeof(struct ResponseBuffer));
if (!info) {
- fprintf(stderr, "Cannot allocate memory for responseInfo\n");
- return NULL;
+ ret = ENOMEM;
}
- info->remaining = 0;
- info->offset = 0;
- info->content = NULL;
- return info;
+ *buffer = info;
+ return ret;
}
-void freeResponseBuffer(ResponseBuffer buffer) {
+void freeResponseBuffer(struct ResponseBuffer *buffer)
+{
if (buffer) {
if (buffer->content) {
free(buffer->content);
@@ -46,8 +61,9 @@ void freeResponseBuffer(ResponseBuffer b
}
}
-void freeResponse(Response resp) {
- if(resp) {
+void freeResponse(struct Response *resp)
+{
+ if (resp) {
freeResponseBuffer(resp->body);
freeResponseBuffer(resp->header);
free(resp);
@@ -55,21 +71,30 @@ void freeResponse(Response resp) {
}
}
-/* Callback for allocating local buffer and reading data to local buffer */
-static size_t writefunc(void *ptr, size_t size, size_t nmemb, ResponseBuffer rbuffer) {
+/**
+ * Callback used by libcurl for allocating local buffer and
+ * reading data to local buffer
+ */
+static size_t writefunc(void *ptr, size_t size,
+ size_t nmemb, struct ResponseBuffer *rbuffer)
+{
+ void *temp = NULL;
if (size * nmemb < 1) {
return 0;
}
if (!rbuffer) {
- fprintf(stderr, "In writefunc, ResponseBuffer is NULL.\n");
- return -1;
+ fprintf(stderr,
+ "ERROR: ResponseBuffer is NULL for the callback writefunc.\n");
+ return 0;
}
if (rbuffer->remaining < size * nmemb) {
- rbuffer->content = realloc(rbuffer->content, rbuffer->offset + size * nmemb + 1);
- if (rbuffer->content == NULL) {
- return -1;
+ temp = realloc(rbuffer->content, rbuffer->offset + size * nmemb + 1);
+ if (temp == NULL) {
+ fprintf(stderr, "ERROR: fail to realloc in callback writefunc.\n");
+ return 0;
}
+ rbuffer->content = temp;
rbuffer->remaining = size * nmemb;
}
memcpy(rbuffer->content + rbuffer->offset, ptr, size * nmemb);
@@ -80,67 +105,84 @@ static size_t writefunc(void *ptr, size_
}
/**
- * Callback for reading data to buffer provided by user,
+ * Callback used by libcurl for reading data into buffer provided by user,
* thus no need to reallocate buffer.
*/
-static size_t writefunc_withbuffer(void *ptr, size_t size, size_t nmemb, ResponseBuffer rbuffer) {
+static size_t writeFuncWithUserBuffer(void *ptr, size_t size,
+ size_t nmemb, struct ResponseBuffer *rbuffer)
+{
+ size_t toCopy = 0;
if (size * nmemb < 1) {
return 0;
}
if (!rbuffer || !rbuffer->content) {
- fprintf(stderr, "In writefunc_withbuffer, the buffer provided by user is NULL.\n");
+ fprintf(stderr,
+ "ERROR: buffer to read is NULL for the "
+ "callback writeFuncWithUserBuffer.\n");
return 0;
}
- size_t toCopy = rbuffer->remaining < (size * nmemb) ? rbuffer->remaining : (size * nmemb);
+ toCopy = rbuffer->remaining < (size * nmemb) ?
+ rbuffer->remaining : (size * nmemb);
memcpy(rbuffer->content + rbuffer->offset, ptr, toCopy);
rbuffer->offset += toCopy;
rbuffer->remaining -= toCopy;
return toCopy;
}
-//callback for writing data to remote peer
-static size_t readfunc(void *ptr, size_t size, size_t nmemb, void *stream) {
+/**
+ * Callback used by libcurl for writing data to remote peer
+ */
+static size_t readfunc(void *ptr, size_t size, size_t nmemb, void *stream)
+{
+ struct webhdfsBuffer *wbuffer = NULL;
if (size * nmemb < 1) {
- fprintf(stderr, "In readfunc callback: size * nmemb == %ld\n", size * nmemb);
return 0;
}
- webhdfsBuffer *wbuffer = (webhdfsBuffer *) stream;
+ wbuffer = stream;
pthread_mutex_lock(&wbuffer->writeMutex);
while (wbuffer->remaining == 0) {
/*
- * the current remainning bytes to write is 0,
- * check whether need to finish the transfer
+ * The current remainning bytes to write is 0,
+ * check closeFlag to see whether need to finish the transfer.
* if yes, return 0; else, wait
*/
- if (wbuffer->closeFlag) {
- //we can close the transfer now
+ if (wbuffer->closeFlag) { // We can close the transfer now
+ //For debug
fprintf(stderr, "CloseFlag is set, ready to close the transfer\n");
pthread_mutex_unlock(&wbuffer->writeMutex);
return 0;
} else {
- // len == 0 indicates that user's buffer has been transferred
+ // remaining == 0 but closeFlag is not set
+ // indicates that user's buffer has been transferred
pthread_cond_signal(&wbuffer->transfer_finish);
- pthread_cond_wait(&wbuffer->newwrite_or_close, &wbuffer->writeMutex);
+ pthread_cond_wait(&wbuffer->newwrite_or_close,
+ &wbuffer->writeMutex);
}
}
- if(wbuffer->remaining > 0 && !wbuffer->closeFlag) {
- size_t copySize = wbuffer->remaining < size * nmemb ? wbuffer->remaining : size * nmemb;
+ if (wbuffer->remaining > 0 && !wbuffer->closeFlag) {
+ size_t copySize = wbuffer->remaining < size * nmemb ?
+ wbuffer->remaining : size * nmemb;
memcpy(ptr, wbuffer->wbuffer + wbuffer->offset, copySize);
wbuffer->offset += copySize;
wbuffer->remaining -= copySize;
pthread_mutex_unlock(&wbuffer->writeMutex);
return copySize;
} else {
- fprintf(stderr, "Webhdfs buffer is %ld, it should be a positive value!\n", wbuffer->remaining);
+ fprintf(stderr, "ERROR: webhdfsBuffer's remaining is %ld, "
+ "it should be a positive value!\n", wbuffer->remaining);
pthread_mutex_unlock(&wbuffer->writeMutex);
return 0;
}
}
-static void initCurlGlobal() {
+/**
+ * Initialize the global libcurl environment
+ */
+static void initCurlGlobal()
+{
if (!curlGlobalInited) {
pthread_mutex_lock(&curlInitMutex);
if (!curlGlobalInited) {
@@ -151,202 +193,297 @@ static void initCurlGlobal() {
}
}
-static Response launchCmd(char *url, enum HttpHeader method, enum Redirect followloc) {
- CURL *curl;
- CURLcode res;
- Response resp;
+/**
+ * Launch simple commands (commands without file I/O) and return response
+ *
+ * @param url Target URL
+ * @param method HTTP method (GET/PUT/POST)
+ * @param followloc Whether or not need to set CURLOPT_FOLLOWLOCATION
+ * @param response Response from remote service
+ * @return 0 for success and non-zero value to indicate error
+ */
+static int launchCmd(const char *url, enum HttpHeader method,
+ enum Redirect followloc, struct Response **response)
+{
+ CURL *curl = NULL;
+ CURLcode curlCode;
+ int ret = 0;
+ struct Response *resp = NULL;
- resp = (Response) calloc(1, sizeof(*resp));
+ resp = calloc(1, sizeof(struct Response));
if (!resp) {
- return NULL;
+ return ENOMEM;
+ }
+ ret = initResponseBuffer(&(resp->body));
+ if (ret) {
+ goto done;
+ }
+ ret = initResponseBuffer(&(resp->header));
+ if (ret) {
+ goto done;
}
- resp->body = initResponseBuffer();
- resp->header = initResponseBuffer();
initCurlGlobal();
- curl = curl_easy_init(); /* get a curl handle */
- if(curl) {
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
- curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
- curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
- curl_easy_setopt(curl, CURLOPT_URL, url); /* specify target URL */
- switch(method) {
- case GET:
- break;
- case PUT:
- curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"PUT");
- break;
- case POST:
- curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"POST");
- break;
- case DELETE:
- curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE");
- break;
- default:
- fprintf(stderr, "\nHTTP method not defined\n");
- exit(EXIT_FAILURE);
- }
- if(followloc == YES) {
- curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
- }
-
- res = curl_easy_perform(curl); /* Now run the curl handler */
- if(res != CURLE_OK) {
- fprintf(stderr, "preform the URL %s failed\n", url);
- return NULL;
- }
+ curl = curl_easy_init();
+ if (!curl) {
+ ret = ENOMEM; // curl_easy_init does not return error code,
+ // and most of its errors are caused by malloc()
+ fprintf(stderr, "ERROR in curl_easy_init.\n");
+ goto done;
+ }
+ /* Set callback function for reading data from remote service */
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
+ curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
+ curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
+ curl_easy_setopt(curl, CURLOPT_URL, url);
+ switch(method) {
+ case GET:
+ break;
+ case PUT:
+ curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
+ break;
+ case POST:
+ curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
+ break;
+ case DELETE:
+ curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
+ break;
+ default:
+ ret = EINVAL;
+ fprintf(stderr, "ERROR: Invalid HTTP method\n");
+ goto done;
+ }
+ if (followloc == YES) {
+ curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
+ }
+ /* Now run the curl handler */
+ curlCode = curl_easy_perform(curl);
+ if (curlCode != CURLE_OK) {
+ ret = EIO;
+ fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n",
+ url, curlCode, curl_easy_strerror(curlCode));
+ }
+done:
+ if (curl != NULL) {
curl_easy_cleanup(curl);
}
- return resp;
+ if (ret) {
+ free(resp);
+ resp = NULL;
+ }
+ *response = resp;
+ return ret;
}
-static Response launchRead_internal(char *url, enum HttpHeader method, enum Redirect followloc, Response resp) {
+/**
+ * Launch the read request. The request is sent to the NameNode and then
+ * redirected to corresponding DataNode
+ *
+ * @param url The URL for the read request
+ * @param resp The response containing the buffer provided by user
+ * @return 0 for success and non-zero value to indicate error
+ */
+static int launchReadInternal(const char *url, struct Response* resp)
+{
+ CURL *curl;
+ CURLcode curlCode;
+ int ret = 0;
+
if (!resp || !resp->body || !resp->body->content) {
- fprintf(stderr, "The user provided buffer should not be NULL!\n");
- return NULL;
+ fprintf(stderr,
+ "ERROR: invalid user-provided buffer!\n");
+ return EINVAL;
}
- CURL *curl;
- CURLcode res;
initCurlGlobal();
- curl = curl_easy_init(); /* get a curl handle */
- if(curl) {
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc_withbuffer);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
- curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
- curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
- curl_easy_setopt(curl, CURLOPT_URL, url); /* specify target URL */
- if(followloc == YES) {
- curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
- }
-
- res = curl_easy_perform(curl); /* Now run the curl handler */
- if(res != CURLE_OK && res != CURLE_PARTIAL_FILE) {
- fprintf(stderr, "preform the URL %s failed\n", url);
- return NULL;
- }
- curl_easy_cleanup(curl);
+ /* get a curl handle */
+ curl = curl_easy_init();
+ if (!curl) {
+ fprintf(stderr, "ERROR in curl_easy_init.\n");
+ return ENOMEM;
}
- return resp;
-
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeFuncWithUserBuffer);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
+ curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
+ curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
+ curl_easy_setopt(curl, CURLOPT_URL, url);
+ curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
+
+ curlCode = curl_easy_perform(curl);
+ if (curlCode != CURLE_OK && curlCode != CURLE_PARTIAL_FILE) {
+ ret = EIO;
+ fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n",
+ url, curlCode, curl_easy_strerror(curlCode));
+ }
+
+ curl_easy_cleanup(curl);
+ return ret;
}
-static Response launchWrite(const char *url, enum HttpHeader method, webhdfsBuffer *uploadBuffer) {
+/**
+ * The function does the write operation by connecting to a DataNode.
+ * The function keeps the connection with the DataNode until
+ * the closeFlag is set. Whenever the current data has been sent out,
+ * the function blocks waiting for further input from user or close.
+ *
+ * @param url URL of the remote DataNode
+ * @param method PUT for create and POST for append
+ * @param uploadBuffer Buffer storing user's data to write
+ * @param response Response from remote service
+ * @return 0 for success and non-zero value to indicate error
+ */
+static int launchWrite(const char *url, enum HttpHeader method,
+ struct webhdfsBuffer *uploadBuffer,
+ struct Response **response)
+{
+ CURLcode curlCode;
+ struct Response* resp = NULL;
+ struct curl_slist *chunk = NULL;
+ CURL *curl = NULL;
+ int ret = 0;
+
if (!uploadBuffer) {
- fprintf(stderr, "upload buffer is NULL!\n");
- errno = EINVAL;
- return NULL;
+ fprintf(stderr, "ERROR: upload buffer is NULL!\n");
+ return EINVAL;
}
+
initCurlGlobal();
- CURLcode res;
- Response response = (Response) calloc(1, sizeof(*response));
- if (!response) {
- fprintf(stderr, "failed to allocate memory for response\n");
- return NULL;
+ resp = calloc(1, sizeof(struct Response));
+ if (!resp) {
+ return ENOMEM;
+ }
+ ret = initResponseBuffer(&(resp->body));
+ if (ret) {
+ goto done;
+ }
+ ret = initResponseBuffer(&(resp->header));
+ if (ret) {
+ goto done;
}
- response->body = initResponseBuffer();
- response->header = initResponseBuffer();
- //connect to the datanode in order to create the lease in the namenode
- CURL *curl = curl_easy_init();
+ // Connect to the datanode in order to create the lease in the namenode
+ curl = curl_easy_init();
if (!curl) {
- fprintf(stderr, "Failed to initialize the curl handle.\n");
- return NULL;
+ fprintf(stderr, "ERROR: failed to initialize the curl handle.\n");
+ return ENOMEM;
}
curl_easy_setopt(curl, CURLOPT_URL, url);
- if(curl) {
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, response->body);
- curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
- curl_easy_setopt(curl, CURLOPT_WRITEHEADER, response->header);
- curl_easy_setopt(curl, CURLOPT_READFUNCTION, readfunc);
- curl_easy_setopt(curl, CURLOPT_READDATA, uploadBuffer);
- curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
- curl_easy_setopt(curl, CURLOPT_VERBOSE, 1);
-
- struct curl_slist *chunk = NULL;
- chunk = curl_slist_append(chunk, "Transfer-Encoding: chunked");
- res = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
- chunk = curl_slist_append(chunk, "Expect:");
- res = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
-
- switch(method) {
- case GET:
- break;
- case PUT:
- curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"PUT");
- break;
- case POST:
- curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"POST");
- break;
- case DELETE:
- curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE");
- break;
- default:
- fprintf(stderr, "\nHTTP method not defined\n");
- exit(EXIT_FAILURE);
- }
- res = curl_easy_perform(curl);
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
+ curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
+ curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
+ curl_easy_setopt(curl, CURLOPT_READFUNCTION, readfunc);
+ curl_easy_setopt(curl, CURLOPT_READDATA, uploadBuffer);
+ curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
+
+ chunk = curl_slist_append(chunk, "Transfer-Encoding: chunked");
+ curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
+ chunk = curl_slist_append(chunk, "Expect:");
+ curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
+
+ switch(method) {
+ case PUT:
+ curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
+ break;
+ case POST:
+ curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
+ break;
+ default:
+ ret = EINVAL;
+ fprintf(stderr, "ERROR: Invalid HTTP method\n");
+ goto done;
+ }
+ curlCode = curl_easy_perform(curl);
+ if (curlCode != CURLE_OK) {
+ ret = EIO;
+ fprintf(stderr, "ERROR: preform the URL %s failed, <%d>: %s\n",
+ url, curlCode, curl_easy_strerror(curlCode));
+ }
+
+done:
+ if (chunk != NULL) {
curl_slist_free_all(chunk);
+ }
+ if (curl != NULL) {
curl_easy_cleanup(curl);
}
-
- return response;
+ if (ret) {
+ free(resp);
+ resp = NULL;
+ }
+ *response = resp;
+ return ret;
}
-Response launchMKDIR(char *url) {
- return launchCmd(url, PUT, NO);
+int launchMKDIR(const char *url, struct Response **resp)
+{
+ return launchCmd(url, PUT, NO, resp);
}
-Response launchRENAME(char *url) {
- return launchCmd(url, PUT, NO);
+int launchRENAME(const char *url, struct Response **resp)
+{
+ return launchCmd(url, PUT, NO, resp);
}
-Response launchGFS(char *url) {
- return launchCmd(url, GET, NO);
+int launchGFS(const char *url, struct Response **resp)
+{
+ return launchCmd(url, GET, NO, resp);
}
-Response launchLS(char *url) {
- return launchCmd(url, GET, NO);
+int launchLS(const char *url, struct Response **resp)
+{
+ return launchCmd(url, GET, NO, resp);
}
-Response launchCHMOD(char *url) {
- return launchCmd(url, PUT, NO);
+int launchCHMOD(const char *url, struct Response **resp)
+{
+ return launchCmd(url, PUT, NO, resp);
}
-Response launchCHOWN(char *url) {
- return launchCmd(url, PUT, NO);
+int launchCHOWN(const char *url, struct Response **resp)
+{
+ return launchCmd(url, PUT, NO, resp);
}
-Response launchDELETE(char *url) {
- return launchCmd(url, DELETE, NO);
+int launchDELETE(const char *url, struct Response **resp)
+{
+ return launchCmd(url, DELETE, NO, resp);
}
-Response launchOPEN(char *url, Response resp) {
- return launchRead_internal(url, GET, YES, resp);
+int launchOPEN(const char *url, struct Response* resp)
+{
+ return launchReadInternal(url, resp);
}
-Response launchUTIMES(char *url) {
- return launchCmd(url, PUT, NO);
+int launchUTIMES(const char *url, struct Response **resp)
+{
+ return launchCmd(url, PUT, NO, resp);
}
-Response launchNnWRITE(char *url) {
- return launchCmd(url, PUT, NO);
+int launchNnWRITE(const char *url, struct Response **resp)
+{
+ return launchCmd(url, PUT, NO, resp);
}
-Response launchNnAPPEND(char *url) {
- return launchCmd(url, POST, NO);
+int launchNnAPPEND(const char *url, struct Response **resp)
+{
+ return launchCmd(url, POST, NO, resp);
}
-Response launchDnWRITE(const char *url, webhdfsBuffer *buffer) {
- return launchWrite(url, PUT, buffer);
+int launchDnWRITE(const char *url, struct webhdfsBuffer *buffer,
+ struct Response **resp)
+{
+ return launchWrite(url, PUT, buffer, resp);
}
-Response launchDnAPPEND(const char *url, webhdfsBuffer *buffer) {
- return launchWrite(url, POST, buffer);
+int launchDnAPPEND(const char *url, struct webhdfsBuffer *buffer,
+ struct Response **resp)
+{
+ return launchWrite(url, POST, buffer, resp);
}
-Response launchSETREPLICATION(char *url) {
- return launchCmd(url, PUT, NO);
+int launchSETREPLICATION(const char *url, struct Response **resp)
+{
+ return launchCmd(url, PUT, NO, resp);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h?rev=1403173&r1=1403172&r2=1403173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h Mon Oct 29 05:10:29 2012
@@ -26,6 +26,7 @@
#include <pthread.h> /* for pthread_t */
#include <unistd.h> /* for size_t */
+/** enum indicating the type of hdfs stream */
enum hdfsStreamType
{
UNINITIALIZED = 0,
@@ -36,28 +37,39 @@ enum hdfsStreamType
/**
* webhdfsBuffer - used for hold the data for read/write from/to http connection
*/
-typedef struct {
- const char *wbuffer; // The user's buffer for uploading
- size_t remaining; // Length of content
- size_t offset; // offset for reading
- int openFlag; // Check whether the hdfsOpenFile has been called before
- int closeFlag; // Whether to close the http connection for writing
- pthread_mutex_t writeMutex; // Synchronization between the curl and hdfsWrite threads
- pthread_cond_t newwrite_or_close; // Transferring thread waits for this condition
- // when there is no more content for transferring in the buffer
- pthread_cond_t transfer_finish; // Condition used to indicate finishing transferring (one buffer)
-} webhdfsBuffer;
+struct webhdfsBuffer {
+ const char *wbuffer; /* The user's buffer for uploading */
+ size_t remaining; /* Length of content */
+ size_t offset; /* offset for reading */
+ /* Check whether the hdfsOpenFile has been called before */
+ int openFlag;
+ /* Whether to close the http connection for writing */
+ int closeFlag;
+ /* Synchronization between the curl and hdfsWrite threads */
+ pthread_mutex_t writeMutex;
+ /*
+ * Transferring thread waits for this condition
+ * when there is no more content for transferring in the buffer
+ */
+ pthread_cond_t newwrite_or_close;
+ /* Condition used to indicate finishing transferring (one buffer) */
+ pthread_cond_t transfer_finish;
+};
+/** File handle for webhdfs */
struct webhdfsFileHandle {
- char *absPath;
- int bufferSize;
- short replication;
- tSize blockSize;
- char *datanode;
- webhdfsBuffer *uploadBuffer;
+ char *absPath; /* Absolute path of file */
+ int bufferSize; /* Size of buffer */
+ short replication; /* Number of replication */
+ tSize blockSize; /* Block size */
+ char *datanode; /* URL of the DataNode */
+ /* webhdfsBuffer handle used to store the upload data */
+ struct webhdfsBuffer *uploadBuffer;
+ /* The thread used for data transferring */
pthread_t connThread;
};
+/** Type of http header */
enum HttpHeader {
GET,
PUT,
@@ -65,44 +77,218 @@ enum HttpHeader {
DELETE
};
+/** Whether to redirect */
enum Redirect {
YES,
NO
};
-typedef struct {
+/** Buffer used for holding response */
+struct ResponseBuffer {
char *content;
size_t remaining;
size_t offset;
-} ResponseBufferInternal;
-typedef ResponseBufferInternal *ResponseBuffer;
+};
/**
* The response got through webhdfs
*/
-typedef struct {
- ResponseBuffer body;
- ResponseBuffer header;
-}* Response;
-
-ResponseBuffer initResponseBuffer();
-void freeResponseBuffer(ResponseBuffer buffer);
-void freeResponse(Response resp);
-
-Response launchMKDIR(char *url);
-Response launchRENAME(char *url);
-Response launchCHMOD(char *url);
-Response launchGFS(char *url);
-Response launchLS(char *url);
-Response launchDELETE(char *url);
-Response launchCHOWN(char *url);
-Response launchOPEN(char *url, Response resp);
-Response launchUTIMES(char *url);
-Response launchNnWRITE(char *url);
-
-Response launchDnWRITE(const char *url, webhdfsBuffer *buffer);
-Response launchNnAPPEND(char *url);
-Response launchSETREPLICATION(char *url);
-Response launchDnAPPEND(const char *url, webhdfsBuffer *buffer);
+struct Response {
+ struct ResponseBuffer *body;
+ struct ResponseBuffer *header;
+};
+
+/**
+ * Create and initialize a ResponseBuffer
+ *
+ * @param buffer Pointer pointing to new created ResponseBuffer handle
+ * @return 0 for success, non-zero value to indicate error
+ */
+int initResponseBuffer(struct ResponseBuffer **buffer) __attribute__ ((warn_unused_result));
+
+/**
+ * Free the given ResponseBuffer
+ *
+ * @param buffer The ResponseBuffer to free
+ */
+void freeResponseBuffer(struct ResponseBuffer *buffer);
+
+/**
+ * Free the given Response
+ *
+ * @param resp The Response to free
+ */
+void freeResponse(struct Response *resp);
+
+/**
+ * Send the MKDIR request to NameNode using the given URL.
+ * The NameNode will execute the operation and return the result as response.
+ *
+ * @param url The URL for MKDIR operation
+ * @param response Response handle to store response returned from the NameNode
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchMKDIR(const char *url,
+ struct Response **response) __attribute__ ((warn_unused_result));
+
+/**
+ * Send the RENAME request to NameNode using the given URL.
+ * The NameNode will execute the operation and return the result as response.
+ *
+ * @param url The URL for RENAME operation
+ * @param response Response handle to store response returned from the NameNode
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchRENAME(const char *url,
+ struct Response **response) __attribute__ ((warn_unused_result));
+
+/**
+ * Send the CHMOD request to NameNode using the given URL.
+ * The NameNode will execute the operation and return the result as response.
+ *
+ * @param url The URL for CHMOD operation
+ * @param response Response handle to store response returned from the NameNode
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchCHMOD(const char *url,
+ struct Response **response) __attribute__ ((warn_unused_result));
+
+/**
+ * Send the GetFileStatus request to NameNode using the given URL.
+ * The NameNode will execute the operation and return the result as response.
+ *
+ * @param url The URL for GetFileStatus operation
+ * @param response Response handle to store response returned from the NameNode,
+ * containing either file status or exception information
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchGFS(const char *url,
+ struct Response **response) __attribute__ ((warn_unused_result));
+
+/**
+ * Send the LS (LISTSTATUS) request to NameNode using the given URL.
+ * The NameNode will execute the operation and return the result as response.
+ *
+ * @param url The URL for LISTSTATUS operation
+ * @param response Response handle to store response returned from the NameNode
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchLS(const char *url,
+ struct Response **response) __attribute__ ((warn_unused_result));
+
+/**
+ * Send the DELETE request to NameNode using the given URL.
+ * The NameNode will execute the operation and return the result as response.
+ *
+ * @param url The URL for DELETE operation
+ * @param response Response handle to store response returned from the NameNode
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchDELETE(const char *url,
+ struct Response **response) __attribute__ ((warn_unused_result));
+
+/**
+ * Send the CHOWN request to NameNode using the given URL.
+ * The NameNode will execute the operation and return the result as response.
+ *
+ * @param url The URL for CHOWN operation
+ * @param response Response handle to store response returned from the NameNode
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchCHOWN(const char *url,
+ struct Response **response) __attribute__ ((warn_unused_result));
+
+/**
+ * Send the OPEN request to NameNode using the given URL,
+ * asking for reading a file (within a range).
+ * The NameNode first redirects the request to the datanode
+ * that holds the corresponding first block of the file (within a range),
+ * and the datanode returns the content of the file through the HTTP connection.
+ *
+ * @param url The URL for OPEN operation
+ * @param resp The response holding user's buffer.
+ The file content will be written into the buffer.
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchOPEN(const char *url,
+ struct Response* resp) __attribute__ ((warn_unused_result));
+
+/**
+ * Send the SETTIMES request to NameNode using the given URL.
+ * The NameNode will execute the operation and return the result as response.
+ *
+ * @param url The URL for SETTIMES operation
+ * @param response Response handle to store response returned from the NameNode
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchUTIMES(const char *url,
+ struct Response **response) __attribute__ ((warn_unused_result));
+
+/**
+ * Send the WRITE/CREATE request to NameNode using the given URL.
+ * The NameNode will choose the writing target datanodes
+ * and return the first datanode in the pipeline as response
+ *
+ * @param url The URL for WRITE/CREATE operation connecting to NameNode
+ * @param response Response handle to store response returned from the NameNode
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchNnWRITE(const char *url,
+ struct Response **response) __attribute__ ((warn_unused_result));
+
+/**
+ * Send the WRITE request along with to-write content to
+ * the corresponding DataNode using the given URL.
+ * The DataNode will write the data and return the response.
+ *
+ * @param url The URL for WRITE operation connecting to DataNode
+ * @param buffer The webhdfsBuffer containing data to be written to hdfs
+ * @param response Response handle to store response returned from the NameNode
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchDnWRITE(const char *url, struct webhdfsBuffer *buffer,
+ struct Response **response) __attribute__ ((warn_unused_result));
+
+/**
+ * Send the WRITE (APPEND) request to NameNode using the given URL.
+ * The NameNode determines the DataNode for appending and
+ * sends its URL back as response.
+ *
+ * @param url The URL for APPEND operation
+ * @param response Response handle to store response returned from the NameNode
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchNnAPPEND(const char *url, struct Response **response) __attribute__ ((warn_unused_result));
+
+/**
+ * Send the SETREPLICATION request to NameNode using the given URL.
+ * The NameNode will execute the operation and return the result as response.
+ *
+ * @param url The URL for SETREPLICATION operation
+ * @param response Response handle to store response returned from the NameNode
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchSETREPLICATION(const char *url,
+ struct Response **response) __attribute__ ((warn_unused_result));
+
+/**
+ * Send the APPEND request along with the content to DataNode.
+ * The DataNode will do the appending and return the result as response.
+ *
+ * @param url The URL for APPEND operation connecting to DataNode
+ * @param buffer The webhdfsBuffer containing data to be appended
+ * @param response Response handle to store response returned from the NameNode
+ * @return 0 for success, non-zero value to indicate error
+ */
+int launchDnAPPEND(const char *url, struct webhdfsBuffer *buffer,
+ struct Response **response) __attribute__ ((warn_unused_result));
+
+/**
+ * Call sys_errlist to get the error message string for the given error code
+ *
+ * @param errnoval The error code value
+ * @return The error message string mapped to the given error code
+ */
+const char *hdfs_strerror(int errnoval);
#endif //_HDFS_HTTP_CLIENT_H_
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c?rev=1403173&r1=1403172&r2=1403173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c Mon Oct 29 05:10:29 2012
@@ -22,233 +22,381 @@
#include <stdio.h>
#include <errno.h>
-#define NUM_OF_PERMISSION_BITS 4
-#define NUM_OF_PORT_BITS 6
-#define NUM_OF_REPLICATION_BITS 6
-
-static char *prepareQUERY(const char *host, int nnPort, const char *srcpath, const char *OP, const char *user) {
- size_t length;
- char *url;
- const char *const protocol = "http://";
- const char *const prefix = "/webhdfs/v1";
- char *temp;
- char *port;
- port= (char*) malloc(NUM_OF_PORT_BITS);
- if (!port) {
- return NULL;
- }
- sprintf(port,"%d",nnPort);
- if (user != NULL) {
- length = strlen(protocol) + strlen(host) + strlen(":") + strlen(port) + strlen(prefix) + strlen(srcpath) + strlen ("?op=") + strlen(OP) + strlen("&user.name=") + strlen(user);
- } else {
- length = strlen(protocol) + strlen(host) + strlen(":") + strlen(port) + strlen(prefix) + strlen(srcpath) + strlen ("?op=") + strlen(OP);
+#define PERM_STR_LEN 4 // "644" + one byte for NUL
+#define SHORT_STR_LEN 6 // 65535 + NUL
+#define LONG_STR_LEN 21 // 2^64-1 = 18446744073709551615 + NUL
+
+/**
+ * Create query based on NameNode hostname,
+ * NameNode port, path, operation and other parameters
+ *
+ * @param host NameNode hostName
+ * @param nnPort Port of NameNode
+ * @param path Absolute path for the corresponding file
+ * @param op Operations
+ * @param paraNum Number of remaining parameters
+ * @param paraNames Names of remaining parameters
+ * @param paraValues Values of remaining parameters
+ * @param url Holding the created URL
+ * @return 0 on success and non-zero value to indicate error
+ */
+static int createQueryURL(const char *host, unsigned int nnPort,
+ const char *path, const char *op, int paraNum,
+ const char **paraNames, const char **paraValues,
+ char **queryUrl)
+{
+ size_t length = 0;
+ int i = 0, offset = 0, ret = 0;
+ char *url = NULL;
+ const char *protocol = "http://";
+ const char *prefix = "/webhdfs/v1";
+
+ if (!paraNames || !paraValues) {
+ return EINVAL;
+ }
+ length = strlen(protocol) + strlen(host) + strlen(":") +
+ SHORT_STR_LEN + strlen(prefix) + strlen(path) +
+ strlen ("?op=") + strlen(op);
+ for (i = 0; i < paraNum; i++) {
+ if (paraNames[i] && paraValues[i]) {
+ length += 2 + strlen(paraNames[i]) + strlen(paraValues[i]);
+ }
+ }
+ url = malloc(length); // The '\0' has already been included
+ // when using SHORT_STR_LEN
+ if (!url) {
+ return ENOMEM;
}
- temp = (char*) malloc(length + 1);
- if (!temp) {
- return NULL;
+ offset = snprintf(url, length, "%s%s:%d%s%s?op=%s",
+ protocol, host, nnPort, prefix, path, op);
+ if (offset >= length || offset < 0) {
+ ret = EIO;
+ goto done;
+ }
+ for (i = 0; i < paraNum; i++) {
+ if (!paraNames[i] || !paraValues[i] || paraNames[i][0] == '\0' ||
+ paraValues[i][0] == '\0') {
+ continue;
+ }
+ offset += snprintf(url + offset, length - offset,
+ "&%s=%s", paraNames[i], paraValues[i]);
+ if (offset >= length || offset < 0) {
+ ret = EIO;
+ goto done;
+ }
}
- strcpy(temp,protocol);
- temp = strcat(temp,host);
- temp = strcat(temp,":");
- temp = strcat(temp,port);
- temp = strcat(temp,prefix);
- temp = strcat(temp,srcpath);
- temp = strcat(temp,"?op=");
- temp = strcat(temp,OP);
- if (user) {
- temp = strcat(temp,"&user.name=");
- temp = strcat(temp,user);
+done:
+ if (ret) {
+ free(url);
+ return ret;
}
- url = temp;
- return url;
+ *queryUrl = url;
+ return 0;
}
-
-static int decToOctal(int decNo) {
- int octNo=0;
- int expo =0;
- while (decNo != 0) {
- octNo = ((decNo % 8) * pow(10,expo)) + octNo;
- decNo = decNo / 8;
- expo++;
- }
- return octNo;
+int createUrlForMKDIR(const char *host, int nnPort,
+ const char *path, const char *user, char **url)
+{
+ const char *userPara = "user.name";
+ return createQueryURL(host, nnPort, path, "MKDIRS", 1,
+ &userPara, &user, url);
}
-
-char *prepareMKDIR(const char *host, int nnPort, const char *dirsubpath, const char *user) {
- return prepareQUERY(host, nnPort, dirsubpath, "MKDIRS", user);
+int createUrlForGetFileStatus(const char *host, int nnPort, const char *path,
+ const char *user, char **url)
+{
+ const char *userPara = "user.name";
+ return createQueryURL(host, nnPort, path, "GETFILESTATUS", 1,
+ &userPara, &user, url);
}
-
-char *prepareMKDIRwithMode(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user) {
- char *url;
- char *permission;
- permission = (char*) malloc(NUM_OF_PERMISSION_BITS);
- if (!permission) {
- return NULL;
- }
- mode = decToOctal(mode);
- sprintf(permission,"%d",mode);
- url = prepareMKDIR(host, nnPort, dirsubpath, user);
- url = realloc(url,(strlen(url) + strlen("&permission=") + strlen(permission) + 1));
- if (!url) {
- return NULL;
- }
- url = strcat(url,"&permission=");
- url = strcat(url,permission);
- return url;
+int createUrlForLS(const char *host, int nnPort, const char *path,
+ const char *user, char **url)
+{
+ const char *userPara = "user.name";
+ return createQueryURL(host, nnPort, path, "LISTSTATUS",
+ 1, &userPara, &user, url);
}
-
-char *prepareRENAME(const char *host, int nnPort, const char *srcpath, const char *destpath, const char *user) {
- char *url;
- url = prepareQUERY(host, nnPort, srcpath, "RENAME", user);
- url = realloc(url,(strlen(url) + strlen("&destination=") + strlen(destpath) + 1));
- if (!url) {
- return NULL;
- }
- url = strcat(url,"&destination=");
- url = strcat(url,destpath);
- return url;
+int createUrlForNnAPPEND(const char *host, int nnPort, const char *path,
+ const char *user, char **url)
+{
+ const char *userPara = "user.name";
+ return createQueryURL(host, nnPort, path, "APPEND",
+ 1, &userPara, &user, url);
}
-char *prepareGFS(const char *host, int nnPort, const char *dirsubpath, const char *user) {
- return (prepareQUERY(host, nnPort, dirsubpath, "GETFILESTATUS", user));
+int createUrlForMKDIRwithMode(const char *host, int nnPort, const char *path,
+ int mode, const char *user, char **url)
+{
+ int strlength;
+ char permission[PERM_STR_LEN];
+ const char *paraNames[2], *paraValues[2];
+
+ paraNames[0] = "permission";
+ paraNames[1] = "user.name";
+ memset(permission, 0, PERM_STR_LEN);
+ strlength = snprintf(permission, PERM_STR_LEN, "%o", mode);
+ if (strlength < 0 || strlength >= PERM_STR_LEN) {
+ return EIO;
+ }
+ paraValues[0] = permission;
+ paraValues[1] = user;
+
+ return createQueryURL(host, nnPort, path, "MKDIRS", 2,
+ paraNames, paraValues, url);
}
-char *prepareLS(const char *host, int nnPort, const char *dirsubpath, const char *user) {
- return (prepareQUERY(host, nnPort, dirsubpath, "LISTSTATUS", user));
+int createUrlForRENAME(const char *host, int nnPort, const char *srcpath,
+ const char *destpath, const char *user, char **url)
+{
+ const char *paraNames[2], *paraValues[2];
+ paraNames[0] = "destination";
+ paraNames[1] = "user.name";
+ paraValues[0] = destpath;
+ paraValues[1] = user;
+
+ return createQueryURL(host, nnPort, srcpath,
+ "RENAME", 2, paraNames, paraValues, url);
}
-char *prepareCHMOD(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user) {
- char *url;
- char *permission;
- permission = (char*) malloc(NUM_OF_PERMISSION_BITS);
- if (!permission) {
- return NULL;
- }
- mode &= 0x3FFF;
- mode = decToOctal(mode);
- sprintf(permission,"%d",mode);
- url = prepareQUERY(host, nnPort, dirsubpath, "SETPERMISSION", user);
- url = realloc(url,(strlen(url) + strlen("&permission=") + strlen(permission) + 1));
- if (!url) {
- return NULL;
+int createUrlForCHMOD(const char *host, int nnPort, const char *path,
+ int mode, const char *user, char **url)
+{
+ int strlength;
+ char permission[PERM_STR_LEN];
+ const char *paraNames[2], *paraValues[2];
+
+ paraNames[0] = "permission";
+ paraNames[1] = "user.name";
+ memset(permission, 0, PERM_STR_LEN);
+ strlength = snprintf(permission, PERM_STR_LEN, "%o", mode);
+ if (strlength < 0 || strlength >= PERM_STR_LEN) {
+ return EIO;
}
- url = strcat(url,"&permission=");
- url = strcat(url,permission);
- return url;
+ paraValues[0] = permission;
+ paraValues[1] = user;
+
+ return createQueryURL(host, nnPort, path, "SETPERMISSION",
+ 2, paraNames, paraValues, url);
}
-char *prepareDELETE(const char *host, int nnPort, const char *dirsubpath, int recursive, const char *user) {
- char *url = (prepareQUERY(host, nnPort, dirsubpath, "DELETE", user));
- char *recursiveFlag = (char *)malloc(6);
- if (!recursive) {
- strcpy(recursiveFlag, "false");
+int createUrlForDELETE(const char *host, int nnPort, const char *path,
+ int recursive, const char *user, char **url)
+{
+ const char *paraNames[2], *paraValues[2];
+ paraNames[0] = "recursive";
+ paraNames[1] = "user.name";
+ if (recursive) {
+ paraValues[0] = "true";
} else {
- strcpy(recursiveFlag, "true");
- }
- url = (char *) realloc(url, strlen(url) + strlen("&recursive=") + strlen(recursiveFlag) + 1);
- if (!url) {
- return NULL;
+ paraValues[0] = "false";
}
+ paraValues[1] = user;
- strcat(url, "&recursive=");
- strcat(url, recursiveFlag);
- return url;
+ return createQueryURL(host, nnPort, path, "DELETE",
+ 2, paraNames, paraValues, url);
}
-char *prepareCHOWN(const char *host, int nnPort, const char *dirsubpath, const char *owner, const char *group, const char *user) {
- char *url;
- url = prepareQUERY(host, nnPort, dirsubpath, "SETOWNER", user);
- if (!url) {
- return NULL;
- }
- if(owner != NULL) {
- url = realloc(url,(strlen(url) + strlen("&owner=") + strlen(owner) + 1));
- url = strcat(url,"&owner=");
- url = strcat(url,owner);
- }
- if (group != NULL) {
- url = realloc(url,(strlen(url) + strlen("&group=") + strlen(group) + 1));
- url = strcat(url,"&group=");
- url = strcat(url,group);
- }
- return url;
+int createUrlForCHOWN(const char *host, int nnPort, const char *path,
+ const char *owner, const char *group,
+ const char *user, char **url)
+{
+ const char *paraNames[3], *paraValues[3];
+ paraNames[0] = "owner";
+ paraNames[1] = "group";
+ paraNames[2] = "user.name";
+ paraValues[0] = owner;
+ paraValues[1] = group;
+ paraValues[2] = user;
+
+ return createQueryURL(host, nnPort, path, "SETOWNER",
+ 3, paraNames, paraValues, url);
}
-char *prepareOPEN(const char *host, int nnPort, const char *dirsubpath, const char *user, size_t offset, size_t length) {
- char *base_url = prepareQUERY(host, nnPort, dirsubpath, "OPEN", user);
- char *url = (char *) malloc(strlen(base_url) + strlen("&offset=") + 15 + strlen("&length=") + 15);
- if (!url) {
- return NULL;
- }
- sprintf(url, "%s&offset=%ld&length=%ld", base_url, offset, length);
- return url;
+int createUrlForOPEN(const char *host, int nnPort, const char *path,
+ const char *user, size_t offset, size_t length, char **url)
+{
+ int strlength;
+ char offsetStr[LONG_STR_LEN], lengthStr[LONG_STR_LEN];
+ const char *paraNames[3], *paraValues[3];
+
+ paraNames[0] = "offset";
+ paraNames[1] = "length";
+ paraNames[2] = "user.name";
+ memset(offsetStr, 0, LONG_STR_LEN);
+ memset(lengthStr, 0, LONG_STR_LEN);
+ strlength = snprintf(offsetStr, LONG_STR_LEN, "%lu", offset);
+ if (strlength < 0 || strlength >= LONG_STR_LEN) {
+ return EIO;
+ }
+ strlength = snprintf(lengthStr, LONG_STR_LEN, "%lu", length);
+ if (strlength < 0 || strlength >= LONG_STR_LEN) {
+ return EIO;
+ }
+ paraValues[0] = offsetStr;
+ paraValues[1] = lengthStr;
+ paraValues[2] = user;
+
+ return createQueryURL(host, nnPort, path, "OPEN",
+ 3, paraNames, paraValues, url);
}
-char *prepareUTIMES(const char *host, int nnPort, const char *dirsubpath, long unsigned mTime, long unsigned aTime, const char *user) {
- char *url;
- char *modTime;
- char *acsTime;
- modTime = (char*) malloc(12);
- acsTime = (char*) malloc(12);
- url = prepareQUERY(host, nnPort, dirsubpath, "SETTIMES", user);
- sprintf(modTime,"%lu",mTime);
- sprintf(acsTime,"%lu",aTime);
- url = realloc(url,(strlen(url) + strlen("&modificationtime=") + strlen(modTime) + strlen("&accesstime=") + strlen(acsTime) + 1));
- if (!url) {
- return NULL;
- }
- url = strcat(url, "&modificationtime=");
- url = strcat(url, modTime);
- url = strcat(url,"&accesstime=");
- url = strcat(url, acsTime);
- return url;
+int createUrlForUTIMES(const char *host, int nnPort, const char *path,
+ long unsigned mTime, long unsigned aTime,
+ const char *user, char **url)
+{
+ int strlength;
+ char modTime[LONG_STR_LEN], acsTime[LONG_STR_LEN];
+ const char *paraNames[3], *paraValues[3];
+
+ memset(modTime, 0, LONG_STR_LEN);
+ memset(acsTime, 0, LONG_STR_LEN);
+ strlength = snprintf(modTime, LONG_STR_LEN, "%lu", mTime);
+ if (strlength < 0 || strlength >= LONG_STR_LEN) {
+ return EIO;
+ }
+ strlength = snprintf(acsTime, LONG_STR_LEN, "%lu", aTime);
+ if (strlength < 0 || strlength >= LONG_STR_LEN) {
+ return EIO;
+ }
+ paraNames[0] = "modificationtime";
+ paraNames[1] = "accesstime";
+ paraNames[2] = "user.name";
+ paraValues[0] = modTime;
+ paraValues[1] = acsTime;
+ paraValues[2] = user;
+
+ return createQueryURL(host, nnPort, path, "SETTIMES",
+ 3, paraNames, paraValues, url);
}
-char *prepareNnWRITE(const char *host, int nnPort, const char *dirsubpath, const char *user, int16_t replication, size_t blockSize) {
- char *url;
- url = prepareQUERY(host, nnPort, dirsubpath, "CREATE", user);
- url = realloc(url, (strlen(url) + strlen("&overwrite=true") + 1));
- if (!url) {
- return NULL;
- }
- url = strcat(url, "&overwrite=true");
+int createUrlForNnWRITE(const char *host, int nnPort,
+ const char *path, const char *user,
+ int16_t replication, size_t blockSize, char **url)
+{
+ int strlength;
+ char repStr[SHORT_STR_LEN], blockSizeStr[LONG_STR_LEN];
+ const char *paraNames[4], *paraValues[4];
+
+ memset(repStr, 0, SHORT_STR_LEN);
+ memset(blockSizeStr, 0, LONG_STR_LEN);
if (replication > 0) {
- url = realloc(url, (strlen(url) + strlen("&replication=") + 6));
- if (!url) {
- return NULL;
+ strlength = snprintf(repStr, SHORT_STR_LEN, "%u", replication);
+ if (strlength < 0 || strlength >= SHORT_STR_LEN) {
+ return EIO;
}
- sprintf(url, "%s&replication=%d", url, replication);
}
if (blockSize > 0) {
- url = realloc(url, (strlen(url) + strlen("&blocksize=") + 16));
- if (!url) {
- return NULL;
+ strlength = snprintf(blockSizeStr, LONG_STR_LEN, "%lu", blockSize);
+ if (strlength < 0 || strlength >= LONG_STR_LEN) {
+ return EIO;
}
- sprintf(url, "%s&blocksize=%ld", url, blockSize);
}
- return url;
+ paraNames[0] = "overwrite";
+ paraNames[1] = "replication";
+ paraNames[2] = "blocksize";
+ paraNames[3] = "user.name";
+ paraValues[0] = "true";
+ paraValues[1] = repStr;
+ paraValues[2] = blockSizeStr;
+ paraValues[3] = user;
+
+ return createQueryURL(host, nnPort, path, "CREATE",
+ 4, paraNames, paraValues, url);
}
-char *prepareNnAPPEND(const char *host, int nnPort, const char *dirsubpath, const char *user) {
- return (prepareQUERY(host, nnPort, dirsubpath, "APPEND", user));
+int createUrlForSETREPLICATION(const char *host, int nnPort,
+ const char *path, int16_t replication,
+ const char *user, char **url)
+{
+ char repStr[SHORT_STR_LEN];
+ const char *paraNames[2], *paraValues[2];
+ int strlength;
+
+ memset(repStr, 0, SHORT_STR_LEN);
+ if (replication > 0) {
+ strlength = snprintf(repStr, SHORT_STR_LEN, "%u", replication);
+ if (strlength < 0 || strlength >= SHORT_STR_LEN) {
+ return EIO;
+ }
+ }
+ paraNames[0] = "replication";
+ paraNames[1] = "user.name";
+ paraValues[0] = repStr;
+ paraValues[1] = user;
+
+ return createQueryURL(host, nnPort, path, "SETREPLICATION",
+ 2, paraNames, paraValues, url);
}
-char *prepareSETREPLICATION(const char *host, int nnPort, const char *path, int16_t replication, const char *user)
+int createUrlForGetBlockLocations(const char *host, int nnPort,
+ const char *path, size_t offset,
+ size_t length, const char *user, char **url)
{
- char *url = prepareQUERY(host, nnPort, path, "SETREPLICATION", user);
- char *replicationNum = (char *) malloc(NUM_OF_REPLICATION_BITS);
- sprintf(replicationNum, "%u", replication);
- url = realloc(url, strlen(url) + strlen("&replication=") + strlen(replicationNum)+ 1);
- if (!url) {
- return NULL;
+ char offsetStr[LONG_STR_LEN], lengthStr[LONG_STR_LEN];
+ const char *paraNames[3], *paraValues[3];
+ int strlength;
+
+ memset(offsetStr, 0, LONG_STR_LEN);
+ memset(lengthStr, 0, LONG_STR_LEN);
+ if (offset > 0) {
+ strlength = snprintf(offsetStr, LONG_STR_LEN, "%lu", offset);
+ if (strlength < 0 || strlength >= LONG_STR_LEN) {
+ return EIO;
+ }
}
+ if (length > 0) {
+ strlength = snprintf(lengthStr, LONG_STR_LEN, "%lu", length);
+ if (strlength < 0 || strlength >= LONG_STR_LEN) {
+ return EIO;
+ }
+ }
+ paraNames[0] = "offset";
+ paraNames[1] = "length";
+ paraNames[2] = "user.name";
+ paraValues[0] = offsetStr;
+ paraValues[1] = lengthStr;
+ paraValues[2] = user;
+
+ return createQueryURL(host, nnPort, path, "GET_BLOCK_LOCATIONS",
+ 3, paraNames, paraValues, url);
+}
+
+int createUrlForReadFromDatanode(const char *dnHost, int dnPort,
+ const char *path, size_t offset,
+ size_t length, const char *user,
+ const char *namenodeRpcAddr, char **url)
+{
+ char offsetStr[LONG_STR_LEN], lengthStr[LONG_STR_LEN];
+ const char *paraNames[4], *paraValues[4];
+ int strlength;
+
+ memset(offsetStr, 0, LONG_STR_LEN);
+ memset(lengthStr, 0, LONG_STR_LEN);
+ if (offset > 0) {
+ strlength = snprintf(offsetStr, LONG_STR_LEN, "%lu", offset);
+ if (strlength < 0 || strlength >= LONG_STR_LEN) {
+ return EIO;
+ }
+ }
+ if (length > 0) {
+ strlength = snprintf(lengthStr, LONG_STR_LEN, "%lu", length);
+ if (strlength < 0 || strlength >= LONG_STR_LEN) {
+ return EIO;
+ }
+ }
+
+ paraNames[0] = "offset";
+ paraNames[1] = "length";
+ paraNames[2] = "user.name";
+ paraNames[3] = "namenoderpcaddress";
+ paraValues[0] = offsetStr;
+ paraValues[1] = lengthStr;
+ paraValues[2] = user;
+ paraValues[3] = namenodeRpcAddr;
- url = strcat(url, "&replication=");
- url = strcat(url, replicationNum);
- return url;
+ return createQueryURL(dnHost, dnPort, path, "OPEN",
+ 4, paraNames, paraValues, url);
}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h?rev=1403173&r1=1403172&r2=1403173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h Mon Oct 29 05:10:29 2012
@@ -20,22 +20,221 @@
#ifndef _HDFS_HTTP_QUERY_H_
#define _HDFS_HTTP_QUERY_H_
-#include <stdint.h>
-#include <stdio.h>
+#include <unistd.h> /* for size_t */
+#include <inttypes.h> /* for int16_t */
-char *prepareMKDIR(const char *host, int nnPort, const char *dirsubpath, const char *user);
-char *prepareMKDIRwithMode(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user);
-char *prepareRENAME(const char *host, int nnPort, const char *srcpath, const char *destpath, const char *user);
-char *prepareCHMOD(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user);
-char *prepareGFS(const char *host, int nnPort, const char *dirsubpath, const char *user);
-char *prepareLS(const char *host, int nnPort, const char *dirsubpath, const char *user);
-char *prepareDELETE(const char *host, int nnPort, const char *dirsubpath, int recursive, const char *user);
-char *prepareCHOWN(const char *host, int nnPort, const char *dirsubpath, const char *owner, const char *group, const char *user);
-char *prepareOPEN(const char *host, int nnPort, const char *dirsubpath, const char *user, size_t offset, size_t length);
-char *prepareUTIMES(const char *host, int nnPort, const char *dirsubpath, long unsigned mTime, long unsigned aTime, const char *user);
-char *prepareNnWRITE(const char *host, int nnPort, const char *dirsubpath, const char *user, int16_t replication, size_t blockSize);
-char *prepareNnAPPEND(const char *host, int nnPort, const char *dirsubpath, const char *user);
-char *prepareSETREPLICATION(const char *host, int nnPort, const char *path, int16_t replication, const char *user);
+/**
+ * Create the URL for a MKDIR request
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param path Path of the dir to create
+ * @param user User name
+ * @param url Holding the generated URL for MKDIR request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForMKDIR(const char *host, int nnPort,
+ const char *path, const char *user,
+ char **url) __attribute__ ((warn_unused_result));
+
+/**
+ * Create the URL for a MKDIR (with mode) request
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param path Path of the dir to create
+ * @param mode Mode of MKDIR
+ * @param user User name
+ * @param url Holding the generated URL for MKDIR request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForMKDIRwithMode(const char *host, int nnPort, const char *path,
+ int mode, const char *user,
+ char **url) __attribute__ ((warn_unused_result));
+
+/**
+ * Create the URL for a RENAME request
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param srcpath Source path
+ * @param dstpath Destination path
+ * @param user User name
+ * @param url Holding the generated URL for RENAME request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForRENAME(const char *host, int nnPort, const char *srcpath,
+ const char *dstpath, const char *user,
+ char **url) __attribute__ ((warn_unused_result));
+
+/**
+ * Create the URL for a CHMOD request
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param path Target path
+ * @param mode New mode for the file
+ * @param user User name
+ * @param url Holding the generated URL for CHMOD request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForCHMOD(const char *host, int nnPort, const char *path,
+ int mode, const char *user,
+ char **url) __attribute__ ((warn_unused_result));
+
+/**
+ * Create the URL for a GETFILESTATUS request
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param path Path of the target file
+ * @param user User name
+ * @param url Holding the generated URL for GETFILESTATUS request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForGetFileStatus(const char *host, int nnPort,
+ const char *path, const char *user,
+ char **url) __attribute__ ((warn_unused_result));
+
+/**
+ * Create the URL for a LISTSTATUS request
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param path Path of the directory for listing
+ * @param user User name
+ * @param url Holding the generated URL for LISTSTATUS request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForLS(const char *host, int nnPort,
+ const char *path, const char *user,
+ char **url) __attribute__ ((warn_unused_result));
+
+/**
+ * Create the URL for a DELETE request
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param path Path of the file to be deletected
+ * @param recursive Whether or not to delete in a recursive way
+ * @param user User name
+ * @param url Holding the generated URL for DELETE request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForDELETE(const char *host, int nnPort, const char *path,
+ int recursive, const char *user,
+ char **url) __attribute__ ((warn_unused_result));
+
+/**
+ * Create the URL for a CHOWN request
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param path Path of the target
+ * @param owner New owner
+ * @param group New group
+ * @param user User name
+ * @param url Holding the generated URL for CHOWN request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForCHOWN(const char *host, int nnPort, const char *path,
+ const char *owner, const char *group, const char *user,
+ char **url) __attribute__ ((warn_unused_result));
+
+/**
+ * Create the URL for a OPEN/READ request
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param path Path of the file to read
+ * @param user User name
+ * @param offset Offset for reading (the start position for this read)
+ * @param length Length of the file to read
+ * @param url Holding the generated URL for OPEN/READ request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForOPEN(const char *host, int nnPort, const char *path,
+ const char *user, size_t offset, size_t length,
+ char **url) __attribute__ ((warn_unused_result));
+
+/**
+ * Create the URL for a UTIMES (update time) request
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param path Path of the file for updating time
+ * @param mTime Modified time to set
+ * @param aTime Access time to set
+ * @param user User name
+ * @param url Holding the generated URL for UTIMES request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForUTIMES(const char *host, int nnPort, const char *path,
+ long unsigned mTime, long unsigned aTime,
+ const char *user,
+ char **url) __attribute__ ((warn_unused_result));
+
+/**
+ * Create the URL for a WRITE/CREATE request (sent to NameNode)
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param path Path of the dir to create
+ * @param user User name
+ * @param replication Number of replication of the file
+ * @param blockSize Size of the block for the file
+ * @param url Holding the generated URL for WRITE request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForNnWRITE(const char *host, int nnPort, const char *path,
+ const char *user, int16_t replication, size_t blockSize,
+ char **url) __attribute__ ((warn_unused_result));
+
+/**
+ * Create the URL for an APPEND request (sent to NameNode)
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param path Path of the file for appending
+ * @param user User name
+ * @param url Holding the generated URL for APPEND request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForNnAPPEND(const char *host, int nnPort,
+ const char *path, const char *user,
+ char **url) __attribute__ ((warn_unused_result));
+
+/**
+ * Create the URL for a SETREPLICATION request
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param path Path of the target file
+ * @param replication New replication number
+ * @param user User name
+ * @param url Holding the generated URL for SETREPLICATION request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForSETREPLICATION(const char *host, int nnPort, const char *path,
+ int16_t replication, const char *user,
+ char **url) __attribute__ ((warn_unused_result));
+
+/**
+ * Create the URL for a GET_BLOCK_LOCATIONS request
+ *
+ * @param host The hostname of the NameNode
+ * @param nnPort Port of the NameNode
+ * @param path Path of the target file
+ * @param offset The offset in the file
+ * @param length Length of the file content
+ * @param user User name
+ * @param url Holding the generated URL for GET_BLOCK_LOCATIONS request
+ * @return 0 on success and non-zero value on errors
+ */
+int createUrlForGetBlockLocations(const char *host, int nnPort,
+ const char *path, size_t offset,
+ size_t length, const char *user,
+ char **url) __attribute__ ((warn_unused_result));
#endif //_HDFS_HTTP_QUERY_H_