You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/09/10 15:43:29 UTC
svn commit: r1382836 [2/3] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/
src/contrib/libwebhdfs/ src/contrib/libwebhdfs/resources/
src/contrib/libwebhdfs/src/
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c Mon Sep 10 13:43:28 2012
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdlib.h>
+#include <string.h>
+#include <ctype.h>
+#include <jansson.h>
+#include "hdfs_json_parser.h"
+#include "exception.h"
+
+hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat, int *numEntries, const char *operation); //Forward Declaration
+
+static hdfsFileInfo *json_parse_array(json_t *jobj, char *key, hdfsFileInfo *fileStat, int *numEntries, const char *operation) {
+ int arraylen = json_array_size(jobj); //Getting the length of the array
+ *numEntries = arraylen;
+ if (!key) {
+ return NULL;
+ }
+ if(arraylen > 0) {
+ fileStat = (hdfsFileInfo *)realloc(fileStat,sizeof(hdfsFileInfo)*arraylen);
+ }
+ json_t *jvalue;
+ int i;
+ for (i=0; i< arraylen; i++) {
+ jvalue = json_array_get(jobj, i); //Getting the array element at position i
+ if (json_is_array(jvalue)) { // array within an array - program should never come here for now
+ json_parse_array(jvalue, NULL, &fileStat[i], numEntries, operation);
+ }
+ else if (json_is_object(jvalue)) { // program will definitely come over here
+ parseJsonGFS(jvalue, &fileStat[i], numEntries, operation);
+ }
+ else {
+ return NULL; // program will never come over here for now
+ }
+ }
+ *numEntries = arraylen;
+ return fileStat;
+}
+
+int parseBoolean(char *response) {
+ json_t *root;
+ json_error_t error;
+ size_t flags = 0;
+ int result = 0;
+ const char *key;
+ json_t *value;
+ root = json_loads(response, flags, &error);
+ void *iter = json_object_iter(root);
+ while(iter) {
+ key = json_object_iter_key(iter);
+ value = json_object_iter_value(iter);
+ switch (json_typeof(value)) {
+ case JSON_TRUE:
+ result = 1;
+ break;
+ default:
+ result = 0;
+ break;
+ }
+ iter = json_object_iter_next(root, iter);
+ }
+ return result;
+}
+
+int parseMKDIR(char *response) {
+ return (parseBoolean(response));
+}
+
+int parseRENAME(char *response) {
+ return (parseBoolean(response));
+}
+
+int parseDELETE(char *response) {
+ return (parseBoolean(response));
+}
+
+hdfs_exception_msg *parseJsonException(json_t *jobj) {
+ const char *key;
+ json_t *value;
+ hdfs_exception_msg *exception = NULL;
+
+ exception = (hdfs_exception_msg *) calloc(1, sizeof(hdfs_exception_msg));
+ if (!exception) {
+ return NULL;
+ }
+
+ void *iter = json_object_iter(jobj);
+ while (iter) {
+ key = json_object_iter_key(iter);
+ value = json_object_iter_value(iter);
+
+ if (!strcmp(key, "exception")) {
+ exception->exception = json_string_value(value);
+ } else if (!strcmp(key, "javaClassName")) {
+ exception->javaClassName = json_string_value(value);
+ } else if (!strcmp(key, "message")) {
+ exception->message = json_string_value(value);
+ }
+
+ iter = json_object_iter_next(jobj, iter);
+ }
+
+ return exception;
+}
+
+hdfs_exception_msg *parseException(const char *content) {
+ if (!content) {
+ return NULL;
+ }
+
+ json_error_t error;
+ size_t flags = 0;
+ const char *key;
+ json_t *value;
+ json_t *jobj = json_loads(content, flags, &error);
+
+ if (!jobj) {
+ fprintf(stderr, "JSon parsing failed\n");
+ return NULL;
+ }
+ void *iter = json_object_iter(jobj);
+ while(iter) {
+ key = json_object_iter_key(iter);
+ value = json_object_iter_value(iter);
+
+ if (!strcmp(key, "RemoteException") && json_typeof(value) == JSON_OBJECT) {
+ return parseJsonException(value);
+ }
+ iter = json_object_iter_next(jobj, iter);
+ }
+ return NULL;
+}
+
+hdfsFileInfo *parseJsonGFS(json_t *jobj, hdfsFileInfo *fileStat, int *numEntries, const char *operation) {
+ const char *tempstr;
+ const char *key;
+ json_t *value;
+ void *iter = json_object_iter(jobj);
+ while(iter) {
+ key = json_object_iter_key(iter);
+ value = json_object_iter_value(iter);
+
+ switch (json_typeof(value)) {
+ case JSON_INTEGER:
+ if(!strcmp(key,"accessTime")) {
+ fileStat->mLastAccess = (time_t)(json_integer_value(value)/1000);
+ } else if (!strcmp(key,"blockSize")) {
+ fileStat->mBlockSize = (tOffset)json_integer_value(value);
+ } else if (!strcmp(key,"length")) {
+ fileStat->mSize = (tOffset)json_integer_value(value);
+ } else if(!strcmp(key,"modificationTime")) {
+ fileStat->mLastMod = (time_t)(json_integer_value(value)/1000);
+ } else if (!strcmp(key,"replication")) {
+ fileStat->mReplication = (short)json_integer_value(value);
+ }
+ break;
+
+ case JSON_STRING:
+ if(!strcmp(key,"group")) {
+ fileStat->mGroup=(char *)json_string_value(value);
+ } else if (!strcmp(key,"owner")) {
+ fileStat->mOwner=(char *)json_string_value(value);
+ } else if (!strcmp(key,"pathSuffix")) {
+ fileStat->mName=(char *)json_string_value(value);
+ } else if (!strcmp(key,"permission")) {
+ tempstr=(char *)json_string_value(value);
+ fileStat->mPermissions = (short)strtol(tempstr,(char **)NULL,8);
+ } else if (!strcmp(key,"type")) {
+ char *cvalue = (char *)json_string_value(value);
+ if (!strcmp(cvalue, "DIRECTORY")) {
+ fileStat->mKind = kObjectKindDirectory;
+ } else {
+ fileStat->mKind = kObjectKindFile;
+ }
+ }
+ break;
+
+ case JSON_OBJECT:
+ if(!strcmp(key,"FileStatus")) {
+ parseJsonGFS(value, fileStat, numEntries, operation);
+ } else if (!strcmp(key,"FileStatuses")) {
+ fileStat = parseJsonGFS(value, &fileStat[0], numEntries, operation);
+ } else if (!strcmp(key,"RemoteException")) {
+ //Besides returning NULL, we also need to print the exception information
+ hdfs_exception_msg *exception = parseJsonException(value);
+ if (exception) {
+ errno = printExceptionWeb(exception, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
+ }
+
+ if(fileStat != NULL) {
+ free(fileStat);
+ fileStat = NULL;
+ }
+ }
+ break;
+
+ case JSON_ARRAY:
+ if (!strcmp(key,"FileStatus")) {
+ fileStat = json_parse_array(value,(char *) key,fileStat,numEntries, operation);
+ }
+ break;
+
+ default:
+ if(fileStat != NULL) {
+ free(fileStat);
+ fileStat = NULL;
+ }
+ }
+ iter = json_object_iter_next(jobj, iter);
+ }
+ return fileStat;
+}
+
+
+int checkHeader(char *header, const char *content, const char *operation) {
+ char *result = NULL;
+ char delims[] = ":";
+ char *responseCode= "200 OK";
+ if(header == '\0' || strncmp(header, "HTTP/", strlen("HTTP/"))) {
+ return 0;
+ }
+ if(!(strstr(header, responseCode)) || !(header = strstr(header, "Content-Length"))) {
+ hdfs_exception_msg *exc = parseException(content);
+ if (exc) {
+ errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
+ }
+ return 0;
+ }
+ result = strtok(header, delims);
+ result = strtok(NULL,delims);
+ while (isspace(*result)) {
+ result++;
+ }
+ if(strcmp(result,"0")) { //Content-Length should be equal to 0
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+int parseOPEN(const char *header, const char *content) {
+ const char *responseCode1 = "307 TEMPORARY_REDIRECT";
+ const char *responseCode2 = "200 OK";
+ if(header == '\0' || strncmp(header,"HTTP/",strlen("HTTP/"))) {
+ return -1;
+ }
+ if(!(strstr(header,responseCode1) && strstr(header, responseCode2))) {
+ hdfs_exception_msg *exc = parseException(content);
+ if (exc) {
+ //if the exception is an IOException and it is because the offset is out of the range
+ //do not print out the exception
+ if (!strcasecmp(exc->exception, "IOException") && strstr(exc->message, "out of the range")) {
+ return 0;
+ }
+ errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (OPEN)");
+ }
+ return -1;
+ }
+
+ return 1;
+}
+
+int parseCHMOD(char *header, const char *content) {
+ return checkHeader(header, content, "CHMOD");
+}
+
+
+int parseCHOWN(char *header, const char *content) {
+ return checkHeader(header, content, "CHOWN");
+}
+
+int parseUTIMES(char *header, const char *content) {
+ return checkHeader(header, content, "UTIMES");
+}
+
+
+int checkIfRedirect(const char *const headerstr, const char *content, const char *operation) {
+ char *responseCode = "307 TEMPORARY_REDIRECT";
+ char * locTag = "Location";
+ char * tempHeader;
+ if(headerstr == '\0' || strncmp(headerstr,"HTTP/", 5)) {
+ return 0;
+ }
+ if(!(tempHeader = strstr(headerstr,responseCode))) {
+ //process possible exception information
+ hdfs_exception_msg *exc = parseException(content);
+ if (exc) {
+ errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (%s)", operation);
+ }
+ return 0;
+ }
+ if(!(strstr(tempHeader,locTag))) {
+ return 0;
+ }
+ return 1;
+}
+
+
+int parseNnWRITE(const char *header, const char *content) {
+ return checkIfRedirect(header, content, "Write(NameNode)");
+}
+
+
+int parseNnAPPEND(const char *header, const char *content) {
+ return checkIfRedirect(header, content, "Append(NameNode)");
+}
+
+char *parseDnLoc(char *content) {
+ char delims[] = "\r\n";
+ char *url = NULL;
+ char *DnLocation = NULL;
+ char *savepter;
+ DnLocation = strtok_r(content, delims, &savepter);
+ while (DnLocation && strncmp(DnLocation, "Location:", strlen("Location:"))) {
+ DnLocation = strtok_r(NULL, delims, &savepter);
+ }
+ if (!DnLocation) {
+ return NULL;
+ }
+ DnLocation = strstr(DnLocation, "http");
+ if (!DnLocation) {
+ return NULL;
+ }
+ url = malloc(strlen(DnLocation) + 1);
+ if (!url) {
+ return NULL;
+ }
+ strcpy(url, DnLocation);
+ return url;
+}
+
+int parseDnWRITE(const char *header, const char *content) {
+ char *responseCode = "201 Created";
+ fprintf(stderr, "\nheaderstr is: %s\n", header);
+ if(header == '\0' || strncmp(header,"HTTP/",strlen("HTTP/"))) {
+ return 0;
+ }
+ if(!(strstr(header,responseCode))) {
+ hdfs_exception_msg *exc = parseException(content);
+ if (exc) {
+ errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (WRITE(DataNode))");
+ }
+ return 0;
+ }
+ return 1;
+}
+
+int parseDnAPPEND(const char *header, const char *content) {
+ char *responseCode = "200 OK";
+ if(header == '\0' || strncmp(header, "HTTP/", strlen("HTTP/"))) {
+ return 0;
+ }
+ if(!(strstr(header, responseCode))) {
+ hdfs_exception_msg *exc = parseException(content);
+ if (exc) {
+ errno = printExceptionWeb(exc, PRINT_EXC_ALL, "Calling WEBHDFS (APPEND(DataNode))");
+ }
+ return 0;
+ }
+ return 1;
+}
+
+hdfsFileInfo *parseGFS(char *str, hdfsFileInfo *fileStat, int *numEntries) {
+ json_error_t error;
+ size_t flags = 0;
+ json_t *jobj = json_loads(str, flags, &error);
+ fileStat = parseJsonGFS(jobj, fileStat, numEntries, "GETPATHSTATUS/LISTSTATUS");
+ return fileStat;
+}
+
+int parseSETREPLICATION(char *response) {
+ return (parseBoolean(response));
+}
+
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h Mon Sep 10 13:43:28 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_JSON_PARSER_H_
+#define _HDFS_JSON_PARSER_H_
+#include "webhdfs.h"
+
+int parseMKDIR(char *response);
+int parseRENAME(char *response);
+int parseDELETE (char *response);
+int parseSETREPLICATION(char *response);
+
+int parseOPEN(const char *header, const char *content);
+
+int parseNnWRITE(const char *header, const char *content);
+int parseDnWRITE(const char *header, const char *content);
+int parseNnAPPEND(const char *header, const char *content);
+int parseDnAPPEND(const char *header, const char *content);
+
+char* parseDnLoc(char *content);
+
+hdfsFileInfo *parseGFS(char *response, hdfsFileInfo *fileStat, int *numEntries);
+
+int parseCHOWN (char *header, const char *content);
+int parseCHMOD (char *header, const char *content);
+int parseUTIMES(char *header, const char *content);
+
+#endif //_FUSE_JSON_PARSER_H
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c Mon Sep 10 13:43:28 2012
@@ -0,0 +1,1113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <jni.h>
+#include "webhdfs.h"
+#include "hdfs_http_client.h"
+#include "hdfs_http_query.h"
+#include "hdfs_json_parser.h"
+#include "jni_helper.h"
+#include "exception.h"
+
+#define HADOOP_HDFS_CONF "org/apache/hadoop/hdfs/HdfsConfiguration"
+#define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode"
+#define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress"
+
+static void initFileinfo(hdfsFileInfo *fileInfo) {
+ if (fileInfo) {
+ fileInfo->mKind = kObjectKindFile;
+ fileInfo->mName = NULL;
+ fileInfo->mLastMod = 0;
+ fileInfo->mSize = 0;
+ fileInfo->mReplication = 0;
+ fileInfo->mBlockSize = 0;
+ fileInfo->mOwner = NULL;
+ fileInfo->mGroup = NULL;
+ fileInfo->mPermissions = 0;
+ fileInfo->mLastAccess = 0;
+ }
+}
+
+static webhdfsBuffer *initWebHdfsBuffer() {
+ webhdfsBuffer *buffer = (webhdfsBuffer *) calloc(1, sizeof(webhdfsBuffer));
+ if (!buffer) {
+ fprintf(stderr, "Fail to allocate memory for webhdfsBuffer.\n");
+ return NULL;
+ }
+ buffer->remaining = 0;
+ buffer->offset = 0;
+ buffer->wbuffer = NULL;
+ buffer->closeFlag = 0;
+ buffer->openFlag = 0;
+ pthread_mutex_init(&buffer->writeMutex, NULL);
+ pthread_cond_init(&buffer->newwrite_or_close, NULL);
+ pthread_cond_init(&buffer->transfer_finish, NULL);
+ return buffer;
+}
+
+static webhdfsBuffer *resetWebhdfsBuffer(webhdfsBuffer *wb, const char *buffer, size_t length) {
+ if (buffer && length > 0) {
+ pthread_mutex_lock(&wb->writeMutex);
+ wb->wbuffer = buffer;
+ wb->offset = 0;
+ wb->remaining = length;
+ pthread_cond_signal(&wb->newwrite_or_close);
+ while (wb->remaining != 0) {
+ pthread_cond_wait(&wb->transfer_finish, &wb->writeMutex);
+ }
+ pthread_mutex_unlock(&wb->writeMutex);
+ }
+ return wb;
+}
+
+static void freeWebhdfsBuffer(webhdfsBuffer *buffer) {
+ if (buffer) {
+ int des = pthread_cond_destroy(&buffer->newwrite_or_close);
+ if (des == EBUSY) {
+ fprintf(stderr, "The condition newwrite_or_close is still referenced!\n");
+ } else if (des == EINVAL) {
+ fprintf(stderr, "The condition newwrite_or_close is invalid!\n");
+ }
+ des = pthread_cond_destroy(&buffer->transfer_finish);
+ if (des == EBUSY) {
+ fprintf(stderr, "The condition transfer_finish is still referenced!\n");
+ } else if (des == EINVAL) {
+ fprintf(stderr, "The condition transfer_finish is invalid!\n");
+ }
+ if (des == EBUSY) {
+ fprintf(stderr, "The condition close_clean is still referenced!\n");
+ } else if (des == EINVAL) {
+ fprintf(stderr, "The condition close_clean is invalid!\n");
+ }
+ des = pthread_mutex_destroy(&buffer->writeMutex);
+ if (des == EBUSY) {
+ fprintf(stderr, "The mutex is still locked or referenced!\n");
+ }
+ free(buffer);
+ buffer = NULL;
+ }
+}
+
+static void freeWebFileHandle(struct webhdfsFileHandle * handle) {
+ if (handle) {
+ freeWebhdfsBuffer(handle->uploadBuffer);
+ if (handle->datanode) {
+ free(handle->datanode);
+ }
+ if (handle->absPath) {
+ free(handle->absPath);
+ }
+ free(handle);
+ handle = NULL;
+ }
+}
+
+struct hdfsBuilder *hdfsNewBuilder(void)
+{
+ struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder));
+ if (!bld) {
+ return NULL;
+ }
+ hdfsSetWorkingDirectory(bld, "/");
+ return bld;
+}
+
+void hdfsFreeBuilder(struct hdfsBuilder *bld)
+{
+ if (bld && bld->workingDir) {
+ free(bld->workingDir);
+ }
+ free(bld);
+}
+
+void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld)
+{
+ if (bld) {
+ bld->forceNewInstance = 1;
+ }
+}
+
+void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
+{
+ if (bld) {
+ bld->nn = nn;
+ bld->nn_jni = nn;
+ }
+}
+
+void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
+{
+ if (bld) {
+ bld->port = port;
+ }
+}
+
+void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
+{
+ if (bld) {
+ bld->userName = userName;
+ }
+}
+
+void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
+ const char *kerbTicketCachePath)
+{
+ if (bld) {
+ bld->kerbTicketCachePath = kerbTicketCachePath;
+ }
+}
+
+hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user)
+{
+ struct hdfsBuilder* bld = hdfsNewBuilder();
+ if (!bld) {
+ return NULL;
+ }
+ hdfsBuilderSetNameNode(bld, nn);
+ hdfsBuilderSetNameNodePort(bld, port);
+ hdfsBuilderSetUserName(bld, user);
+ return hdfsBuilderConnect(bld);
+}
+
+hdfsFS hdfsConnect(const char* nn, tPort port)
+{
+ return hdfsConnectAsUser(nn, port, NULL);
+}
+
+hdfsFS hdfsConnectNewInstance(const char* nn, tPort port)
+{
+ struct hdfsBuilder* bld = (struct hdfsBuilder *) hdfsConnect(nn, port);
+ if (!bld) {
+ return NULL;
+ }
+ hdfsBuilderSetForceNewInstance(bld);
+ return bld;
+}
+
+hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port,
+ const char *user)
+{
+ struct hdfsBuilder *bld = hdfsNewBuilder();
+ if (!bld)
+ return NULL;
+ hdfsBuilderSetNameNode(bld, host);
+ hdfsBuilderSetNameNodePort(bld, port);
+ hdfsBuilderSetUserName(bld, user);
+ hdfsBuilderSetForceNewInstance(bld);
+ return hdfsBuilderConnect(bld);
+}
+
+const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
+ char *buf, size_t bufLen);
+
+hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
+{
+ if (!bld) {
+ return NULL;
+ }
+ // if the hostname is null for the namenode, set it to localhost
+ //only handle bld->nn
+ if (bld->nn == NULL) {
+ bld->nn = "localhost";
+ } else {
+ /* check whether the hostname of the namenode (nn in hdfsBuilder) has already contained the port */
+ const char *lastColon = rindex(bld->nn, ':');
+ if (lastColon && (strspn(lastColon + 1, "0123456789") == strlen(lastColon + 1))) {
+ fprintf(stderr, "port %d was given, but URI '%s' already "
+ "contains a port!\n", bld->port, bld->nn);
+ char *newAddr = (char *)malloc(strlen(bld->nn) - strlen(lastColon) + 1);
+ if (!newAddr) {
+ return NULL;
+ }
+ strncpy(newAddr, bld->nn, strlen(bld->nn) - strlen(lastColon));
+ newAddr[strlen(bld->nn) - strlen(lastColon)] = '\0';
+ free(bld->nn);
+ bld->nn = newAddr;
+ }
+ }
+
+ /* if the namenode is "default" and/or the port of namenode is 0, get the default namenode/port by using JNI */
+ if (bld->port == 0 || !strcasecmp("default", bld->nn)) {
+ JNIEnv *env = 0;
+ jobject jHDFSConf = NULL, jAddress = NULL;
+ jvalue jVal;
+ jthrowable jthr = NULL;
+ int ret = 0;
+ char buf[512];
+
+ //Get the JNIEnv* corresponding to current thread
+ env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ free(bld);
+ bld = NULL;
+ return NULL;
+ }
+
+ // jHDFSConf = new HDFSConfiguration();
+ jthr = constructNewObjectOfClass(env, &jHDFSConf, HADOOP_HDFS_CONF, "()V");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+
+ jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_NAMENODE, "getHttpAddress",
+ "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;",
+ jHDFSConf);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done; //free(bld), deleteReference for jHDFSConf
+ }
+ jAddress = jVal.l;
+
+ if (bld->port == 0) {
+ jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
+ JAVA_INETSOCKETADDRESS, "getPort", "()I");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+ bld->port = jVal.i;
+ }
+
+ if (!strcasecmp("default", bld->nn)) {
+ jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
+ JAVA_INETSOCKETADDRESS, "getHostName", "()Ljava/lang/String;");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+ bld->nn = (const char*) ((*env)->GetStringUTFChars(env, jVal.l, NULL));
+ }
+
+ done:
+ destroyLocalReference(env, jHDFSConf);
+ destroyLocalReference(env, jAddress);
+ if (ret) { //if there is error/exception, we free the builder and return NULL
+ free(bld);
+ bld = NULL;
+ }
+ }
+
+ //for debug
+ fprintf(stderr, "namenode: %s:%d\n", bld->nn, bld->port);
+ return bld;
+}
+
+int hdfsDisconnect(hdfsFS fs)
+{
+ if (fs == NULL) {
+ errno = EBADF;
+ return -1;
+ } else {
+ free(fs);
+ fs = NULL;
+ }
+ return 0;
+}
+
+char *getAbsolutePath(hdfsFS fs, const char *path) {
+ if (fs == NULL || path == NULL) {
+ return NULL;
+ }
+ char *absPath = NULL;
+ struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+
+ if ('/' != *path && bld->workingDir) {
+ absPath = (char *)malloc(strlen(bld->workingDir) + strlen(path) + 1);
+ if (!absPath) {
+ return NULL;
+ }
+ absPath = strcpy(absPath, bld->workingDir);
+ absPath = strcat(absPath, path);
+ return absPath;
+ } else {
+ absPath = (char *)malloc(strlen(path) + 1);
+ if (!absPath) {
+ return NULL;
+ }
+ absPath = strcpy(absPath, path);
+ return absPath;
+ }
+}
+
+int hdfsCreateDirectory(hdfsFS fs, const char* path)
+{
+ if (fs == NULL || path == NULL) {
+ return -1;
+ }
+
+ char *absPath = getAbsolutePath(fs, path);
+ if (!absPath) {
+ return -1;
+ }
+
+ struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+ char *url = NULL;
+ Response resp = NULL;
+ int ret = 0;
+
+ if(!((url = prepareMKDIR(bld->nn, bld->port, absPath, bld->userName))
+ && (resp = launchMKDIR(url))
+ && (parseMKDIR(resp->body->content)))) {
+ ret = -1;
+ }
+
+ freeResponse(resp);
+ free(url);
+ free(absPath);
+ return ret;
+}
+
+int hdfsChmod(hdfsFS fs, const char* path, short mode)
+{
+ if (fs == NULL || path == NULL) {
+ return -1;
+ }
+
+ char *absPath = getAbsolutePath(fs, path);
+ if (!absPath) {
+ return -1;
+ }
+ struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+ char *url=NULL;
+ Response resp = NULL;
+ int ret = 0;
+
+ if(!((url = prepareCHMOD(bld->nn, bld->port, absPath, (int)mode, bld->userName))
+ && (resp = launchCHMOD(url))
+ && (parseCHMOD(resp->header->content, resp->body->content)))) {
+ ret = -1;
+ }
+
+ freeResponse(resp);
+ free(absPath);
+ free(url);
+ return ret;
+}
+
+int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group)
+{
+ if (fs == NULL || path == NULL) {
+ return -1;
+ }
+
+ char *absPath = getAbsolutePath(fs, path);
+ if (!absPath) {
+ return -1;
+ }
+ struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+ char *url=NULL;
+ Response resp = NULL;
+ int ret = 0;
+
+ if(!((url = prepareCHOWN(bld->nn, bld->port, absPath, owner, group, bld->userName))
+ && (resp = launchCHOWN(url))
+ && (parseCHOWN(resp->header->content, resp->body->content)))) {
+ ret = -1;
+ }
+
+ freeResponse(resp);
+ free(absPath);
+ free(url);
+ return ret;
+}
+
+int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath)
+{
+ if (fs == NULL || oldPath == NULL || newPath == NULL) {
+ return -1;
+ }
+
+ char *oldAbsPath = getAbsolutePath(fs, oldPath);
+ if (!oldAbsPath) {
+ return -1;
+ }
+ char *newAbsPath = getAbsolutePath(fs, newPath);
+ if (!newAbsPath) {
+ return -1;
+ }
+
+ struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+ char *url=NULL;
+ Response resp = NULL;
+ int ret = 0;
+
+ if(!((url = prepareRENAME(bld->nn, bld->port, oldAbsPath, newAbsPath, bld->userName))
+ && (resp = launchRENAME(url))
+ && (parseRENAME(resp->body->content)))) {
+ ret = -1;
+ }
+
+ freeResponse(resp);
+ free(oldAbsPath);
+ free(newAbsPath);
+ free(url);
+ return ret;
+}
+
+hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path)
+{
+ if (fs == NULL || path == NULL) {
+ return NULL;
+ }
+
+ char *absPath = getAbsolutePath(fs, path);
+ if (!absPath) {
+ return NULL;
+ }
+
+ struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+ char *url=NULL;
+ Response resp = NULL;
+ int numEntries = 0;
+ int ret = 0;
+
+ hdfsFileInfo * fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo));
+ if (!fileInfo) {
+ ret = -1;
+ goto done;
+ }
+ initFileinfo(fileInfo);
+
+ if(!((url = prepareGFS(bld->nn, bld->port, absPath, bld->userName))
+ && (resp = launchGFS(url))
+ && (fileInfo = parseGFS(resp->body->content, fileInfo, &numEntries)))) {
+ ret = -1;
+ goto done;
+ }
+
+done:
+ freeResponse(resp);
+ free(absPath);
+ free(url);
+
+ if (ret == 0) {
+ return fileInfo;
+ } else {
+ free(fileInfo);
+ return NULL;
+ }
+}
+
+hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries)
+{
+ if (fs == NULL || path == NULL) {
+ return NULL;
+ }
+
+ char *absPath = getAbsolutePath(fs, path);
+ if (!absPath) {
+ return NULL;
+ }
+
+ struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+ char *url = NULL;
+ Response resp = NULL;
+ int ret = 0;
+
+ hdfsFileInfo * fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo));
+ if (!fileInfo) {
+ ret = -1;
+ goto done;
+ }
+
+ if(!((url = prepareLS(bld->nn, bld->port, absPath, bld->userName))
+ && (resp = launchLS(url))
+ && (fileInfo = parseGFS(resp->body->content, fileInfo, numEntries)))) {
+ ret = -1;
+ goto done;
+ }
+
+done:
+ freeResponse(resp);
+ free(absPath);
+ free(url);
+
+ if (ret == 0) {
+ return fileInfo;
+ } else {
+ free(fileInfo);
+ return NULL;
+ }
+}
+
+int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication)
+{
+ if (fs == NULL || path == NULL) {
+ return -1;
+ }
+ char *absPath = getAbsolutePath(fs, path);
+ if (!absPath) {
+ return -1;
+ }
+
+ struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+ char *url = NULL;
+ Response resp = NULL;
+ int ret = 0;
+
+ if(!((url = prepareSETREPLICATION(bld->nn, bld->port, absPath, replication, bld->userName))
+ && (resp = launchSETREPLICATION(url))
+ && (parseSETREPLICATION(resp->body->content)))) {
+ ret = -1;
+ }
+
+ freeResponse(resp);
+ free(absPath);
+ free(url);
+ return ret;
+}
+
+void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
+{
+ //Free the mName, mOwner, and mGroup
+ int i;
+ for (i=0; i < numEntries; ++i) {
+ if (hdfsFileInfo[i].mName) {
+ free(hdfsFileInfo[i].mName);
+ }
+ if (hdfsFileInfo[i].mOwner) {
+ free(hdfsFileInfo[i].mOwner);
+ }
+ if (hdfsFileInfo[i].mGroup) {
+ free(hdfsFileInfo[i].mGroup);
+ }
+ }
+
+ //Free entire block
+ free(hdfsFileInfo);
+ hdfsFileInfo = NULL;
+}
+
+int hdfsDelete(hdfsFS fs, const char* path, int recursive)
+{
+ if (fs == NULL || path == NULL) {
+ return -1;
+ }
+ char *absPath = getAbsolutePath(fs, path);
+ if (!absPath) {
+ return -1;
+ }
+
+ struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+ char *url = NULL;
+ Response resp = NULL;
+ int ret = 0;
+
+ if(!((url = prepareDELETE(bld->nn, bld->port, absPath, recursive, bld->userName))
+ && (resp = launchDELETE(url))
+ && (parseDELETE(resp->body->content)))) {
+ ret = -1;
+ }
+
+ freeResponse(resp);
+ free(absPath);
+ free(url);
+ return ret;
+}
+
+int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime)
+{
+ if (fs == NULL || path == NULL) {
+ return -1;
+ }
+ char *absPath = getAbsolutePath(fs, path);
+ if (!absPath) {
+ return -1;
+ }
+
+ struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+ char *url = NULL;
+ Response resp = NULL;
+ int ret = 0;
+
+ if(!((url = prepareUTIMES(bld->nn, bld->port, absPath, mtime, atime, bld->userName))
+ && (resp = launchUTIMES(url))
+ && (parseUTIMES(resp->header->content, resp->body->content)))) {
+ ret = -1;
+ }
+
+ freeResponse(resp);
+ free(absPath);
+ free(url);
+ return ret;
+}
+
+int hdfsExists(hdfsFS fs, const char *path)
+{
+ hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, path);
+ if (fileInfo) {
+ hdfsFreeFileInfo(fileInfo, 1);
+ return 0;
+ } else {
+ return -1;
+ }
+}
+
+typedef struct {
+ char *url;
+ webhdfsBuffer *uploadBuffer;
+ int flags;
+ Response resp;
+} threadData;
+
+static void freeThreadData(threadData *data) {
+ if (data) {
+ if (data->url) {
+ free(data->url);
+ }
+ if (data->resp) {
+ freeResponse(data->resp);
+ }
+ //the uploadBuffer would be freed by freeWebFileHandle()
+ free(data);
+ data = NULL;
+ }
+}
+
+static void *writeThreadOperation(void *v) {
+ threadData *data = (threadData *) v;
+ if (data->flags & O_APPEND) {
+ data->resp = launchDnAPPEND(data->url, data->uploadBuffer);
+ } else {
+ data->resp = launchDnWRITE(data->url, data->uploadBuffer);
+ }
+ return data;
+}
+
+hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
+ int bufferSize, short replication, tSize blockSize)
+{
+ /*
+ * the original version of libhdfs based on JNI store a fsinputstream/fsoutputstream in the hdfsFile
+ * in libwebhdfs that is based on webhdfs, we store (absolute_path, buffersize, replication, blocksize) in it
+ */
+ if (fs == NULL || path == NULL) {
+ return NULL;
+ }
+
+ int accmode = flags & O_ACCMODE;
+ if (accmode == O_RDWR) {
+ fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n");
+ errno = ENOTSUP;
+ return NULL;
+ }
+
+ if ((flags & O_CREAT) && (flags & O_EXCL)) {
+ fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
+ }
+
+ hdfsFile hdfsFileHandle = (hdfsFile) calloc(1, sizeof(struct hdfsFile_internal));
+ if (!hdfsFileHandle) {
+ return NULL;
+ }
+ int ret = 0;
+ hdfsFileHandle->flags = flags;
+ hdfsFileHandle->type = accmode == O_RDONLY ? INPUT : OUTPUT;
+ hdfsFileHandle->offset = 0;
+ struct webhdfsFileHandle *webhandle = (struct webhdfsFileHandle *) calloc(1, sizeof(struct webhdfsFileHandle));
+ if (!webhandle) {
+ ret = -1;
+ goto done;
+ }
+ webhandle->bufferSize = bufferSize;
+ webhandle->replication = replication;
+ webhandle->blockSize = blockSize;
+ webhandle->absPath = getAbsolutePath(fs, path);
+ if (!webhandle->absPath) {
+ ret = -1;
+ goto done;
+ }
+ hdfsFileHandle->file = webhandle;
+
+ //for write/append, need to connect to the namenode
+ //and get the url of corresponding datanode
+ if (hdfsFileHandle->type == OUTPUT) {
+ webhandle->uploadBuffer = initWebHdfsBuffer();
+ if (!webhandle->uploadBuffer) {
+ ret = -1;
+ goto done;
+ }
+ struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+ char *url = NULL;
+ Response resp = NULL;
+ int append = flags & O_APPEND;
+ int create = append ? 0 : 1;
+
+ //if create: send create request to NN
+ if (create) {
+ url = prepareNnWRITE(bld->nn, bld->port, webhandle->absPath, bld->userName, webhandle->replication, webhandle->blockSize);
+ } else if (append) {
+ url = prepareNnAPPEND(bld->nn, bld->port, webhandle->absPath, bld->userName);
+ }
+ if (!url) {
+ fprintf(stderr,
+ "fail to create the url connecting to namenode for file creation/appending\n");
+ ret = -1;
+ goto done;
+ }
+
+ if (create) {
+ resp = launchNnWRITE(url);
+ } else if (append) {
+ resp = launchNnAPPEND(url);
+ }
+ if (!resp) {
+ fprintf(stderr,
+ "fail to get the response from namenode for file creation/appending\n");
+ free(url);
+ ret = -1;
+ goto done;
+ }
+
+ int parseRet = 0;
+ if (create) {
+ parseRet = parseNnWRITE(resp->header->content, resp->body->content);
+ } else if (append) {
+ parseRet = parseNnAPPEND(resp->header->content, resp->body->content);
+ }
+ if (!parseRet) {
+ fprintf(stderr,
+ "fail to parse the response from namenode for file creation/appending\n");
+ free(url);
+ freeResponse(resp);
+ ret = -1;
+ goto done;
+ }
+
+ free(url);
+ url = parseDnLoc(resp->header->content);
+ if (!url) {
+ fprintf(stderr,
+ "fail to get the datanode url from namenode for file creation/appending\n");
+ freeResponse(resp);
+ ret = -1;
+ return NULL;
+ }
+ freeResponse(resp);
+ //store the datanode url in the file handle
+ webhandle->datanode = strdup(url);
+
+ //create a new thread for performing the http transferring
+ threadData *data = (threadData *) calloc(1, sizeof(threadData));
+ if (!data) {
+ ret = -1;
+ goto done;
+ }
+ data->url = strdup(url);
+ data->flags = flags;
+ data->uploadBuffer = webhandle->uploadBuffer;
+ free(url);
+ ret = pthread_create(&webhandle->connThread, NULL, writeThreadOperation, data);
+ if (ret) {
+ fprintf(stderr, "Failed to create the writing thread.\n");
+ } else {
+ webhandle->uploadBuffer->openFlag = 1;
+ }
+ }
+
+done:
+ if (ret == 0) {
+ return hdfsFileHandle;
+ } else {
+ freeWebFileHandle(webhandle);
+ free(hdfsFileHandle);
+ return NULL;
+ }
+}
+
+tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length)
+{
+ if (length == 0) {
+ return 0;
+ }
+ if (fs == NULL || file == NULL || file->type != OUTPUT || length < 0) {
+ return -1;
+ }
+
+ struct webhdfsFileHandle *wfile = (struct webhdfsFileHandle *) file->file;
+ if (wfile->uploadBuffer && wfile->uploadBuffer->openFlag) {
+ resetWebhdfsBuffer(wfile->uploadBuffer, buffer, length);
+ return length;
+ } else {
+ fprintf(stderr, "Error: have not opened the file %s for writing yet.\n", wfile->absPath);
+ return -1;
+ }
+}
+
+int hdfsCloseFile(hdfsFS fs, hdfsFile file)
+{
+ int ret = 0;
+ fprintf(stderr, "to close file...\n");
+ if (file->type == OUTPUT) {
+ void *respv;
+ threadData *tdata;
+ struct webhdfsFileHandle *wfile = (struct webhdfsFileHandle *) file->file;
+ pthread_mutex_lock(&(wfile->uploadBuffer->writeMutex));
+ wfile->uploadBuffer->closeFlag = 1;
+ pthread_cond_signal(&wfile->uploadBuffer->newwrite_or_close);
+ pthread_mutex_unlock(&(wfile->uploadBuffer->writeMutex));
+
+ //waiting for the writing thread to terminate
+ ret = pthread_join(wfile->connThread, &respv);
+ if (ret) {
+ fprintf(stderr, "Error (code %d) when pthread_join.\n", ret);
+ }
+ //parse the response
+ tdata = (threadData *) respv;
+ if (!tdata) {
+ fprintf(stderr, "Response from the writing thread is NULL.\n");
+ ret = -1;
+ }
+ if (file->flags & O_APPEND) {
+ parseDnAPPEND(tdata->resp->header->content, tdata->resp->body->content);
+ } else {
+ parseDnWRITE(tdata->resp->header->content, tdata->resp->body->content);
+ }
+ //free the threaddata
+ freeThreadData(tdata);
+ }
+
+ fprintf(stderr, "To clean the webfilehandle...\n");
+ if (file) {
+ freeWebFileHandle(file->file);
+ free(file);
+ file = NULL;
+ fprintf(stderr, "Cleaned the webfilehandle...\n");
+ }
+ return ret;
+}
+
+int hdfsFileIsOpenForRead(hdfsFile file)
+{
+ return (file->type == INPUT);
+}
+
+int hdfsFileIsOpenForWrite(hdfsFile file)
+{
+ return (file->type == OUTPUT);
+}
+
+tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length)
+{
+ if (length == 0) {
+ return 0;
+ }
+ if (fs == NULL || file == NULL || file->type != INPUT || buffer == NULL || length < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ struct hdfsBuilder *bld = (struct hdfsBuilder *) fs;
+ struct webhdfsFileHandle *webFile = (struct webhdfsFileHandle *) file->file;
+ char *url = NULL;
+ Response resp = NULL;
+ int openResult = -1;
+
+ resp = (Response) calloc(1, sizeof(*resp));
+ if (!resp) {
+ return -1;
+ }
+ resp->header = initResponseBuffer();
+ resp->body = initResponseBuffer();
+ resp->body->content = buffer;
+ resp->body->remaining = length;
+
+ if (!((url = prepareOPEN(bld->nn, bld->port, webFile->absPath, bld->userName, file->offset, length))
+ && (resp = launchOPEN(url, resp))
+ && ((openResult = parseOPEN(resp->header->content, resp->body->content)) > 0))) {
+ free(url);
+ freeResponseBuffer(resp->header);
+ if (openResult == 0) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+ size_t readSize = resp->body->offset;
+ file->offset += readSize;
+
+ freeResponseBuffer(resp->header);
+ free(resp->body);
+ free(resp);
+ free(url);
+ return readSize;
+}
+
+int hdfsAvailable(hdfsFS fs, hdfsFile file)
+{
+ if (!file || !fs) {
+ return -1;
+ }
+ struct webhdfsFileHandle *wf = (struct webhdfsFileHandle *) file->file;
+ if (!wf) {
+ return -1;
+ }
+ hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, wf->absPath);
+ if (fileInfo) {
+ int available = (int)(fileInfo->mSize - file->offset);
+ hdfsFreeFileInfo(fileInfo, 1);
+ return available;
+ } else {
+ return -1;
+ }
+}
+
+int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos)
+{
+ if (!fs || !file || desiredPos < 0) {
+ return -1;
+ }
+ struct webhdfsFileHandle *wf = (struct webhdfsFileHandle *) file->file;
+ if (!wf) {
+ return -1;
+ }
+ hdfsFileInfo *fileInfo = hdfsGetPathInfo(fs, wf->absPath);
+ int ret = 0;
+ if (fileInfo) {
+ if (fileInfo->mSize < desiredPos) {
+ errno = ENOTSUP;
+ fprintf(stderr,
+ "hdfsSeek for %s failed since the desired position %lld is beyond the size of the file %lld\n",
+ wf->absPath, desiredPos, fileInfo->mSize);
+ ret = -1;
+ } else {
+ file->offset = desiredPos;
+ }
+ hdfsFreeFileInfo(fileInfo, 1);
+ return ret;
+ } else {
+ return -1;
+ }
+}
+
+tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length)
+{
+ if (!fs || !file || file->type != INPUT || position < 0 || !buffer || length < 0) {
+ return -1;
+ }
+ file->offset = position;
+ return hdfsRead(fs, file, buffer, length);
+}
+
+tOffset hdfsTell(hdfsFS fs, hdfsFile file)
+{
+ if (!file) {
+ return -1;
+ }
+ return file->offset;
+}
+
+char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize)
+{
+ if (fs == NULL || buffer == NULL || bufferSize <= 0) {
+ return NULL;
+ }
+
+ struct hdfsBuilder * bld = (struct hdfsBuilder *) fs;
+ if (bld->workingDir) {
+ strncpy(buffer, bld->workingDir, bufferSize);
+ }
+ return buffer;
+}
+
+int hdfsSetWorkingDirectory(hdfsFS fs, const char* path)
+{
+ if (fs == NULL || path == NULL) {
+ return -1;
+ }
+
+ struct hdfsBuilder * bld = (struct hdfsBuilder *) fs;
+ free(bld->workingDir);
+ bld->workingDir = (char *)malloc(strlen(path) + 1);
+ if (!(bld->workingDir)) {
+ return -1;
+ }
+ strcpy(bld->workingDir, path);
+ return 0;
+}
+
+void hdfsFreeHosts(char ***blockHosts)
+{
+ int i, j;
+ for (i=0; blockHosts[i]; i++) {
+ for (j=0; blockHosts[i][j]; j++) {
+ free(blockHosts[i][j]);
+ }
+ free(blockHosts[i]);
+ }
+ free(blockHosts);
+}
+
+/* not useful for libwebhdfs */
+int hdfsFileUsesDirectRead(hdfsFile file)
+{
+ /* return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ); */
+ fprintf(stderr, "hdfsFileUsesDirectRead is no longer useful for libwebhdfs.\n");
+ return -1;
+}
+
+/* not useful for libwebhdfs */
+void hdfsFileDisableDirectRead(hdfsFile file)
+{
+ /* file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ; */
+ fprintf(stderr, "hdfsFileDisableDirectRead is no longer useful for libwebhdfs.\n");
+}
+
+/* not useful for libwebhdfs */
+int hdfsHFlush(hdfsFS fs, hdfsFile file)
+{
+ return 0;
+}
+
+/* not useful for libwebhdfs */
+int hdfsFlush(hdfsFS fs, hdfsFile file)
+{
+ return 0;
+}
+
+char*** hdfsGetHosts(hdfsFS fs, const char* path,
+ tOffset start, tOffset length)
+{
+ fprintf(stderr, "hdfsGetHosts is not but will be supported by libwebhdfs yet.\n");
+ return NULL;
+}
+
+tOffset hdfsGetCapacity(hdfsFS fs)
+{
+ fprintf(stderr, "hdfsGetCapacity is not but will be supported by libwebhdfs.\n");
+ return -1;
+}
+
+tOffset hdfsGetUsed(hdfsFS fs)
+{
+ fprintf(stderr, "hdfsGetUsed is not but will be supported by libwebhdfs yet.\n");
+ return -1;
+}
+
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.c?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.c Mon Sep 10 13:43:28 2012
@@ -0,0 +1,609 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//#include "config.h"
+#include "exception.h"
+#include "jni_helper.h"
+
+#include <stdio.h>
+#include <string.h>
+
+static pthread_mutex_t hdfsHashMutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t jvmMutex = PTHREAD_MUTEX_INITIALIZER;
+static volatile int hashTableInited = 0;
+
+#define LOCK_HASH_TABLE() pthread_mutex_lock(&hdfsHashMutex)
+#define UNLOCK_HASH_TABLE() pthread_mutex_unlock(&hdfsHashMutex)
+
+
+/** The Native return types that methods could return */
+#define VOID 'V'
+#define JOBJECT 'L'
+#define JARRAYOBJECT '['
+#define JBOOLEAN 'Z'
+#define JBYTE 'B'
+#define JCHAR 'C'
+#define JSHORT 'S'
+#define JINT 'I'
+#define JLONG 'J'
+#define JFLOAT 'F'
+#define JDOUBLE 'D'
+
+
+/**
+ * MAX_HASH_TABLE_ELEM: The maximum no. of entries in the hashtable.
+ * It's set to 4096 to account for (classNames + No. of threads)
+ */
+#define MAX_HASH_TABLE_ELEM 4096
+
+/** Key that allows us to retrieve thread-local storage */
+static pthread_key_t gTlsKey;
+
+/** nonzero if we succeeded in initializing gTlsKey. Protected by the jvmMutex */
+static int gTlsKeyInitialized = 0;
+
+/** Pthreads thread-local storage for each library thread. */
+struct hdfsTls {
+ JNIEnv *env;
+};
+
+/**
+ * The function that is called whenever a thread with libhdfs thread local data
+ * is destroyed.
+ *
+ * @param v The thread-local data
+ */
+static void hdfsThreadDestructor(void *v)
+{
+ struct hdfsTls *tls = v;
+ JavaVM *vm;
+ JNIEnv *env = tls->env;
+ jint ret;
+
+ ret = (*env)->GetJavaVM(env, &vm);
+ if (ret) {
+ fprintf(stderr, "hdfsThreadDestructor: GetJavaVM failed with "
+ "error %d\n", ret);
+ (*env)->ExceptionDescribe(env);
+ } else {
+ (*vm)->DetachCurrentThread(vm);
+ }
+ free(tls);
+}
+
+void destroyLocalReference(JNIEnv *env, jobject jObject)
+{
+ if (jObject)
+ (*env)->DeleteLocalRef(env, jObject);
+}
+
+static jthrowable validateMethodType(JNIEnv *env, MethType methType)
+{
+ if (methType != STATIC && methType != INSTANCE) {
+ return newRuntimeError(env, "validateMethodType(methType=%d): "
+ "illegal method type.\n", methType);
+ }
+ return NULL;
+}
+
+jthrowable newJavaStr(JNIEnv *env, const char *str, jstring *out)
+{
+ jstring jstr;
+
+ if (!str) {
+ /* Can't pass NULL to NewStringUTF: the result would be
+ * implementation-defined. */
+ *out = NULL;
+ return NULL;
+ }
+ jstr = (*env)->NewStringUTF(env, str);
+ if (!jstr) {
+ /* If NewStringUTF returns NULL, an exception has been thrown,
+ * which we need to handle. Probaly an OOM. */
+ return getPendingExceptionAndClear(env);
+ }
+ *out = jstr;
+ return NULL;
+}
+
+jthrowable newCStr(JNIEnv *env, jstring jstr, char **out)
+{
+ const char *tmp;
+
+ if (!jstr) {
+ *out = NULL;
+ return NULL;
+ }
+ tmp = (*env)->GetStringUTFChars(env, jstr, NULL);
+ if (!tmp) {
+ return getPendingExceptionAndClear(env);
+ }
+ *out = strdup(tmp);
+ (*env)->ReleaseStringUTFChars(env, jstr, tmp);
+ return NULL;
+}
+
+static int hashTableInit(void)
+{
+ if (!hashTableInited) {
+ LOCK_HASH_TABLE();
+ if (!hashTableInited) {
+ if (hcreate(MAX_HASH_TABLE_ELEM) == 0) {
+ fprintf(stderr, "error creating hashtable, <%d>: %s\n",
+ errno, strerror(errno));
+ return 0;
+ }
+ hashTableInited = 1;
+ }
+ UNLOCK_HASH_TABLE();
+ }
+ return 1;
+}
+
+
+static int insertEntryIntoTable(const char *key, void *data)
+{
+ ENTRY e, *ep;
+ if (key == NULL || data == NULL) {
+ return 0;
+ }
+ if (! hashTableInit()) {
+ return -1;
+ }
+ e.data = data;
+ e.key = (char*)key;
+ LOCK_HASH_TABLE();
+ ep = hsearch(e, ENTER);
+ UNLOCK_HASH_TABLE();
+ if (ep == NULL) {
+ fprintf(stderr, "warn adding key (%s) to hash table, <%d>: %s\n",
+ key, errno, strerror(errno));
+ }
+ return 0;
+}
+
+
+
+static void* searchEntryFromTable(const char *key)
+{
+ ENTRY e,*ep;
+ if (key == NULL) {
+ return NULL;
+ }
+ hashTableInit();
+ e.key = (char*)key;
+ LOCK_HASH_TABLE();
+ ep = hsearch(e, FIND);
+ UNLOCK_HASH_TABLE();
+ if (ep != NULL) {
+ return ep->data;
+ }
+ return NULL;
+}
+
+
+
+jthrowable invokeMethod(JNIEnv *env, jvalue *retval, MethType methType,
+ jobject instObj, const char *className,
+ const char *methName, const char *methSignature, ...)
+{
+ va_list args;
+ jclass cls;
+ jmethodID mid;
+ jthrowable jthr;
+ const char *str;
+ char returnType;
+
+ jthr = validateMethodType(env, methType);
+ if (jthr)
+ return jthr;
+ jthr = globalClassReference(className, env, &cls);
+ if (jthr)
+ return jthr;
+ jthr = methodIdFromClass(className, methName, methSignature,
+ methType, env, &mid);
+ if (jthr)
+ return jthr;
+ str = methSignature;
+ while (*str != ')') str++;
+ str++;
+ returnType = *str;
+ va_start(args, methSignature);
+ if (returnType == JOBJECT || returnType == JARRAYOBJECT) {
+ jobject jobj = NULL;
+ if (methType == STATIC) {
+ jobj = (*env)->CallStaticObjectMethodV(env, cls, mid, args);
+ }
+ else if (methType == INSTANCE) {
+ jobj = (*env)->CallObjectMethodV(env, instObj, mid, args);
+ }
+ retval->l = jobj;
+ }
+ else if (returnType == VOID) {
+ if (methType == STATIC) {
+ (*env)->CallStaticVoidMethodV(env, cls, mid, args);
+ }
+ else if (methType == INSTANCE) {
+ (*env)->CallVoidMethodV(env, instObj, mid, args);
+ }
+ }
+ else if (returnType == JBOOLEAN) {
+ jboolean jbool = 0;
+ if (methType == STATIC) {
+ jbool = (*env)->CallStaticBooleanMethodV(env, cls, mid, args);
+ }
+ else if (methType == INSTANCE) {
+ jbool = (*env)->CallBooleanMethodV(env, instObj, mid, args);
+ }
+ retval->z = jbool;
+ }
+ else if (returnType == JSHORT) {
+ jshort js = 0;
+ if (methType == STATIC) {
+ js = (*env)->CallStaticShortMethodV(env, cls, mid, args);
+ }
+ else if (methType == INSTANCE) {
+ js = (*env)->CallShortMethodV(env, instObj, mid, args);
+ }
+ retval->s = js;
+ }
+ else if (returnType == JLONG) {
+ jlong jl = -1;
+ if (methType == STATIC) {
+ jl = (*env)->CallStaticLongMethodV(env, cls, mid, args);
+ }
+ else if (methType == INSTANCE) {
+ jl = (*env)->CallLongMethodV(env, instObj, mid, args);
+ }
+ retval->j = jl;
+ }
+ else if (returnType == JINT) {
+ jint ji = -1;
+ if (methType == STATIC) {
+ ji = (*env)->CallStaticIntMethodV(env, cls, mid, args);
+ }
+ else if (methType == INSTANCE) {
+ ji = (*env)->CallIntMethodV(env, instObj, mid, args);
+ }
+ retval->i = ji;
+ }
+ va_end(args);
+
+ jthr = (*env)->ExceptionOccurred(env);
+ if (jthr) {
+ (*env)->ExceptionClear(env);
+ return jthr;
+ }
+ return NULL;
+}
+
+jthrowable constructNewObjectOfClass(JNIEnv *env, jobject *out, const char *className,
+ const char *ctorSignature, ...)
+{
+ va_list args;
+ jclass cls;
+ jmethodID mid;
+ jobject jobj;
+ jthrowable jthr;
+
+ jthr = globalClassReference(className, env, &cls);
+ if (jthr)
+ return jthr;
+ jthr = methodIdFromClass(className, "<init>", ctorSignature,
+ INSTANCE, env, &mid);
+ if (jthr)
+ return jthr;
+ va_start(args, ctorSignature);
+ jobj = (*env)->NewObjectV(env, cls, mid, args);
+ va_end(args);
+ if (!jobj)
+ return getPendingExceptionAndClear(env);
+ *out = jobj;
+ return NULL;
+}
+
+
+jthrowable methodIdFromClass(const char *className, const char *methName,
+ const char *methSignature, MethType methType,
+ JNIEnv *env, jmethodID *out)
+{
+ jclass cls;
+ jthrowable jthr;
+
+ jthr = globalClassReference(className, env, &cls);
+ if (jthr)
+ return jthr;
+ jmethodID mid = 0;
+ jthr = validateMethodType(env, methType);
+ if (jthr)
+ return jthr;
+ if (methType == STATIC) {
+ mid = (*env)->GetStaticMethodID(env, cls, methName, methSignature);
+ }
+ else if (methType == INSTANCE) {
+ mid = (*env)->GetMethodID(env, cls, methName, methSignature);
+ }
+ if (mid == NULL) {
+ fprintf(stderr, "could not find method %s from class %s with "
+ "signature %s\n", methName, className, methSignature);
+ return getPendingExceptionAndClear(env);
+ }
+ *out = mid;
+ return NULL;
+}
+
+jthrowable globalClassReference(const char *className, JNIEnv *env, jclass *out)
+{
+ jclass clsLocalRef;
+ jclass cls = searchEntryFromTable(className);
+ if (cls) {
+ *out = cls;
+ return NULL;
+ }
+ clsLocalRef = (*env)->FindClass(env,className);
+ if (clsLocalRef == NULL) {
+ return getPendingExceptionAndClear(env);
+ }
+ cls = (*env)->NewGlobalRef(env, clsLocalRef);
+ if (cls == NULL) {
+ (*env)->DeleteLocalRef(env, clsLocalRef);
+ return getPendingExceptionAndClear(env);
+ }
+ (*env)->DeleteLocalRef(env, clsLocalRef);
+ insertEntryIntoTable(className, cls);
+ *out = cls;
+ return NULL;
+}
+
+jthrowable classNameOfObject(jobject jobj, JNIEnv *env, char **name)
+{
+ jthrowable jthr;
+ jclass cls, clsClass = NULL;
+ jmethodID mid;
+ jstring str = NULL;
+ const char *cstr = NULL;
+ char *newstr;
+
+ cls = (*env)->GetObjectClass(env, jobj);
+ if (cls == NULL) {
+ jthr = getPendingExceptionAndClear(env);
+ goto done;
+ }
+ clsClass = (*env)->FindClass(env, "java/lang/Class");
+ if (clsClass == NULL) {
+ jthr = getPendingExceptionAndClear(env);
+ goto done;
+ }
+ mid = (*env)->GetMethodID(env, clsClass, "getName", "()Ljava/lang/String;");
+ if (mid == NULL) {
+ jthr = getPendingExceptionAndClear(env);
+ goto done;
+ }
+ str = (*env)->CallObjectMethod(env, cls, mid);
+ if (str == NULL) {
+ jthr = getPendingExceptionAndClear(env);
+ goto done;
+ }
+ cstr = (*env)->GetStringUTFChars(env, str, NULL);
+ if (!cstr) {
+ jthr = getPendingExceptionAndClear(env);
+ goto done;
+ }
+ newstr = strdup(cstr);
+ if (newstr == NULL) {
+ jthr = newRuntimeError(env, "classNameOfObject: out of memory");
+ goto done;
+ }
+ *name = newstr;
+ jthr = NULL;
+
+done:
+ destroyLocalReference(env, cls);
+ destroyLocalReference(env, clsClass);
+ if (str) {
+ if (cstr)
+ (*env)->ReleaseStringUTFChars(env, str, cstr);
+ (*env)->DeleteLocalRef(env, str);
+ }
+ return jthr;
+}
+
+
+/**
+ * Get the global JNI environemnt.
+ *
+ * We only have to create the JVM once. After that, we can use it in
+ * every thread. You must be holding the jvmMutex when you call this
+ * function.
+ *
+ * @return The JNIEnv on success; error code otherwise
+ */
+static JNIEnv* getGlobalJNIEnv(void)
+{
+ const jsize vmBufLength = 1;
+ JavaVM* vmBuf[vmBufLength];
+ JNIEnv *env;
+ jint rv = 0;
+ jint noVMs = 0;
+ jthrowable jthr;
+
+ rv = JNI_GetCreatedJavaVMs(&(vmBuf[0]), vmBufLength, &noVMs);
+ if (rv != 0) {
+ fprintf(stderr, "JNI_GetCreatedJavaVMs failed with error: %d\n", rv);
+ return NULL;
+ }
+
+ if (noVMs == 0) {
+ //Get the environment variables for initializing the JVM
+ char *hadoopClassPath = getenv("CLASSPATH");
+ if (hadoopClassPath == NULL) {
+ fprintf(stderr, "Environment variable CLASSPATH not set!\n");
+ return NULL;
+ }
+ char *hadoopClassPathVMArg = "-Djava.class.path=";
+ size_t optHadoopClassPathLen = strlen(hadoopClassPath) +
+ strlen(hadoopClassPathVMArg) + 1;
+ char *optHadoopClassPath = malloc(sizeof(char)*optHadoopClassPathLen);
+ snprintf(optHadoopClassPath, optHadoopClassPathLen,
+ "%s%s", hadoopClassPathVMArg, hadoopClassPath);
+
+ // Determine the # of LIBHDFS_OPTS args
+ int noArgs = 1;
+ char *hadoopJvmArgs = getenv("LIBHDFS_OPTS");
+ char jvmArgDelims[] = " ";
+ char *str, *token, *savePtr;
+ if (hadoopJvmArgs != NULL) {
+ hadoopJvmArgs = strdup(hadoopJvmArgs);
+ for (noArgs = 1, str = hadoopJvmArgs; ; noArgs++, str = NULL) {
+ token = strtok_r(str, jvmArgDelims, &savePtr);
+ if (NULL == token) {
+ break;
+ }
+ }
+ free(hadoopJvmArgs);
+ }
+
+ // Now that we know the # args, populate the options array
+ JavaVMOption options[noArgs];
+ options[0].optionString = optHadoopClassPath;
+ hadoopJvmArgs = getenv("LIBHDFS_OPTS");
+ if (hadoopJvmArgs != NULL) {
+ hadoopJvmArgs = strdup(hadoopJvmArgs);
+ for (noArgs = 1, str = hadoopJvmArgs; ; noArgs++, str = NULL) {
+ token = strtok_r(str, jvmArgDelims, &savePtr);
+ if (NULL == token) {
+ break;
+ }
+ options[noArgs].optionString = token;
+ }
+ }
+
+ //Create the VM
+ JavaVMInitArgs vm_args;
+ JavaVM *vm;
+ vm_args.version = JNI_VERSION_1_2;
+ vm_args.options = options;
+ vm_args.nOptions = noArgs;
+ vm_args.ignoreUnrecognized = 1;
+
+ rv = JNI_CreateJavaVM(&vm, (void*)&env, &vm_args);
+
+ if (hadoopJvmArgs != NULL) {
+ free(hadoopJvmArgs);
+ }
+ free(optHadoopClassPath);
+
+ if (rv != 0) {
+ fprintf(stderr, "Call to JNI_CreateJavaVM failed "
+ "with error: %d\n", rv);
+ return NULL;
+ }
+ jthr = invokeMethod(env, NULL, STATIC, NULL,
+ "org/apache/hadoop/fs/FileSystem",
+ "loadFileSystems", "()V");
+ if (jthr) {
+ printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "loadFileSystems");
+ }
+ }
+ else {
+ //Attach this thread to the VM
+ JavaVM* vm = vmBuf[0];
+ rv = (*vm)->AttachCurrentThread(vm, (void*)&env, 0);
+ if (rv != 0) {
+ fprintf(stderr, "Call to AttachCurrentThread "
+ "failed with error: %d\n", rv);
+ return NULL;
+ }
+ }
+
+ return env;
+}
+
+/**
+ * getJNIEnv: A helper function to get the JNIEnv* for the given thread.
+ * If no JVM exists, then one will be created. JVM command line arguments
+ * are obtained from the LIBHDFS_OPTS environment variable.
+ *
+ * Implementation note: we rely on POSIX thread-local storage (tls).
+ * This allows us to associate a destructor function with each thread, that
+ * will detach the thread from the Java VM when the thread terminates. If we
+ * failt to do this, it will cause a memory leak.
+ *
+ * However, POSIX TLS is not the most efficient way to do things. It requires a
+ * key to be initialized before it can be used. Since we don't know if this key
+ * is initialized at the start of this function, we have to lock a mutex first
+ * and check. Luckily, most operating systems support the more efficient
+ * __thread construct, which is initialized by the linker.
+ *
+ * @param: None.
+ * @return The JNIEnv* corresponding to the thread.
+ */
+JNIEnv* getJNIEnv(void)
+{
+ JNIEnv *env;
+ struct hdfsTls *tls;
+ int ret;
+
+#ifdef HAVE_BETTER_TLS
+ static __thread struct hdfsTls *quickTls = NULL;
+ if (quickTls)
+ return quickTls->env;
+#endif
+ pthread_mutex_lock(&jvmMutex);
+ if (!gTlsKeyInitialized) {
+ ret = pthread_key_create(&gTlsKey, hdfsThreadDestructor);
+ if (ret) {
+ pthread_mutex_unlock(&jvmMutex);
+ fprintf(stderr, "getJNIEnv: pthread_key_create failed with "
+ "error %d\n", ret);
+ return NULL;
+ }
+ gTlsKeyInitialized = 1;
+ }
+ tls = pthread_getspecific(gTlsKey);
+ if (tls) {
+ pthread_mutex_unlock(&jvmMutex);
+ return tls->env;
+ }
+
+ env = getGlobalJNIEnv();
+ pthread_mutex_unlock(&jvmMutex);
+ if (!env) {
+ fprintf(stderr, "getJNIEnv: getGlobalJNIEnv failed\n");
+ return NULL;
+ }
+ tls = calloc(1, sizeof(struct hdfsTls));
+ if (!tls) {
+ fprintf(stderr, "getJNIEnv: OOM allocating %zd bytes\n",
+ sizeof(struct hdfsTls));
+ return NULL;
+ }
+ tls->env = env;
+ ret = pthread_setspecific(gTlsKey, tls);
+ if (ret) {
+ fprintf(stderr, "getJNIEnv: pthread_setspecific failed with "
+ "error code %d\n", ret);
+ hdfsThreadDestructor(tls);
+ return NULL;
+ }
+#ifdef HAVE_BETTER_TLS
+ quickTls = tls;
+#endif
+ return env;
+}
+
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.h?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.h (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.h Mon Sep 10 13:43:28 2012
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBHDFS_JNI_HELPER_H
+#define LIBHDFS_JNI_HELPER_H
+
+#include <jni.h>
+#include <stdio.h>
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <search.h>
+#include <pthread.h>
+#include <errno.h>
+
+#define PATH_SEPARATOR ':'
+
+
+/** Denote the method we want to invoke as STATIC or INSTANCE */
+typedef enum {
+ STATIC,
+ INSTANCE
+} MethType;
+
+/**
+ * Create a new malloc'ed C string from a Java string.
+ *
+ * @param env The JNI environment
+ * @param jstr The Java string
+ * @param out (out param) the malloc'ed C string
+ *
+ * @return NULL on success; the exception otherwise
+ */
+jthrowable newCStr(JNIEnv *env, jstring jstr, char **out);
+
+/**
+ * Create a new Java string from a C string.
+ *
+ * @param env The JNI environment
+ * @param str The C string
+ * @param out (out param) the java string
+ *
+ * @return NULL on success; the exception otherwise
+ */
+jthrowable newJavaStr(JNIEnv *env, const char *str, jstring *out);
+
+/**
+ * Helper function to destroy a local reference of java.lang.Object
+ * @param env: The JNIEnv pointer.
+ * @param jFile: The local reference of java.lang.Object object
+ * @return None.
+ */
+void destroyLocalReference(JNIEnv *env, jobject jObject);
+
+/** invokeMethod: Invoke a Static or Instance method.
+ * className: Name of the class where the method can be found
+ * methName: Name of the method
+ * methSignature: the signature of the method "(arg-types)ret-type"
+ * methType: The type of the method (STATIC or INSTANCE)
+ * instObj: Required if the methType is INSTANCE. The object to invoke
+ the method on.
+ * env: The JNIEnv pointer
+ * retval: The pointer to a union type which will contain the result of the
+ method invocation, e.g. if the method returns an Object, retval will be
+ set to that, if the method returns boolean, retval will be set to the
+ value (JNI_TRUE or JNI_FALSE), etc.
+ * exc: If the methods throws any exception, this will contain the reference
+ * Arguments (the method arguments) must be passed after methSignature
+ * RETURNS: -1 on error and 0 on success. If -1 is returned, exc will have
+ a valid exception reference, and the result stored at retval is undefined.
+ */
+jthrowable invokeMethod(JNIEnv *env, jvalue *retval, MethType methType,
+ jobject instObj, const char *className, const char *methName,
+ const char *methSignature, ...);
+
+jthrowable constructNewObjectOfClass(JNIEnv *env, jobject *out, const char *className,
+ const char *ctorSignature, ...);
+
+jthrowable methodIdFromClass(const char *className, const char *methName,
+ const char *methSignature, MethType methType,
+ JNIEnv *env, jmethodID *out);
+
+jthrowable globalClassReference(const char *className, JNIEnv *env, jclass *out);
+
+/** classNameOfObject: Get an object's class name.
+ * @param jobj: The object.
+ * @param env: The JNIEnv pointer.
+ * @param name: (out param) On success, will contain a string containing the
+ * class name. This string must be freed by the caller.
+ * @return NULL on success, or the exception
+ */
+jthrowable classNameOfObject(jobject jobj, JNIEnv *env, char **name);
+
+/** getJNIEnv: A helper function to get the JNIEnv* for the given thread.
+ * If no JVM exists, then one will be created. JVM command line arguments
+ * are obtained from the LIBHDFS_OPTS environment variable.
+ * @param: None.
+ * @return The JNIEnv* corresponding to the thread.
+ * */
+JNIEnv* getJNIEnv(void);
+
+#endif /*LIBHDFS_JNI_HELPER_H*/
+
+/**
+ * vim: ts=4: sw=4: et:
+ */
+
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_multi_write.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_multi_write.c?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_multi_write.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_multi_write.c Mon Sep 10 13:43:28 2012
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "expect.h"
+#include "webhdfs.h"
+
+#include <errno.h>
+#include <semaphore.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include "hdfs_http_client.h"
+#include "hdfs_http_query.h"
+#include "hdfs_json_parser.h"
+#include <unistd.h>
+#include <curl/curl.h>
+
+#define TLH_MAX_THREADS 100
+
+static sem_t *tlhSem;
+
+static const char *nn;
+static const char *user;
+static int port;
+
+static const char *fileName = "/tmp/tlhData";
+
+struct tlhThreadInfo {
+ /** Thread index */
+ int threadIdx;
+ /** 0 = thread was successful; error code otherwise */
+ int success;
+ /** pthread identifier */
+ pthread_t thread;
+};
+
+static int hdfsSingleNameNodeConnect(const char *nn, int port, const char *user, hdfsFS *fs)
+{
+ hdfsFS hdfs;
+ if (port < 0) {
+ fprintf(stderr, "hdfsSingleNameNodeConnect: nmdGetNameNodePort "
+ "returned error %d\n", port);
+ return port;
+ }
+
+ hdfs = hdfsConnectAsUserNewInstance(nn, port, user);
+ if (!hdfs) {
+ return -errno;
+ }
+ *fs = hdfs;
+ return 0;
+}
+
+static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
+{
+ hdfsFile file;
+ int ret = 0;
+ char buffer[1024 * (ti->threadIdx + 1)];
+ memset(buffer, 'a', sizeof(buffer));
+
+ file = hdfsOpenFile(fs, "/tmp/thread_test.txt", O_WRONLY, 0, 0, 0);
+ sleep(1);
+ hdfsCloseFile(fs, file);
+ return ret;
+}
+
+static void *testHdfsOperations(void *v)
+{
+ struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v;
+ hdfsFS fs = NULL;
+ int ret;
+
+ fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
+ ti->threadIdx);
+ ret = hdfsSingleNameNodeConnect(nn, port, user, &fs);
+ if (ret) {
+ fprintf(stderr, "testHdfsOperations(threadIdx=%d): "
+ "hdfsSingleNameNodeConnect failed with error %d.\n",
+ ti->threadIdx, ret);
+ ti->success = EIO;
+ return NULL;
+ }
+ ti->success = doTestHdfsOperations(ti, fs);
+ if (hdfsDisconnect(fs)) {
+ ret = errno;
+ fprintf(stderr, "hdfsDisconnect error %d\n", ret);
+ ti->success = ret;
+ }
+ return NULL;
+}
+
+static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
+{
+ int i, threadsFailed = 0;
+ const char *sep = "";
+
+ for (i = 0; i < tlhNumThreads; i++) {
+ if (ti[i].success != 0) {
+ threadsFailed = 1;
+ }
+ }
+ if (!threadsFailed) {
+ fprintf(stderr, "testLibHdfs: all threads succeeded. SUCCESS.\n");
+ return EXIT_SUCCESS;
+ }
+ fprintf(stderr, "testLibHdfs: some threads failed: [");
+ for (i = 0; i < tlhNumThreads; i++) {
+ if (ti[i].success != 0) {
+ fprintf(stderr, "%s%d", sep, i);
+ sep = ", ";
+ }
+ }
+ fprintf(stderr, "]. FAILURE.\n");
+ return EXIT_FAILURE;
+}
+
+/**
+ * Test that we can write a file with libhdfs and then read it back
+ */
+int main(int argc, const char *args[])
+{
+ if (argc != 4) {
+ fprintf(stderr, "usage: test_libhdfs_threaded <namenode> <port> <username>");
+ return -1;
+ }
+
+ nn = args[1];
+ port = atoi(args[2]);
+ user = args[3];
+
+ int i, tlhNumThreads;
+ const char *tlhNumThreadsStr;
+ struct tlhThreadInfo ti[TLH_MAX_THREADS];
+
+ tlhNumThreadsStr = getenv("TLH_NUM_THREADS");
+ if (!tlhNumThreadsStr) {
+ tlhNumThreadsStr = "3";
+ }
+ tlhNumThreads = atoi(tlhNumThreadsStr);
+ if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) {
+ fprintf(stderr, "testLibHdfs: must have a number of threads "
+ "between 1 and %d inclusive, not %d\n",
+ TLH_MAX_THREADS, tlhNumThreads);
+ return EXIT_FAILURE;
+ }
+ memset(&ti[0], 0, sizeof(ti));
+ for (i = 0; i < tlhNumThreads; i++) {
+ ti[i].threadIdx = i;
+ }
+
+ tlhSem = sem_open("sem", O_CREAT, 0644, tlhNumThreads);
+
+ for (i = 0; i < tlhNumThreads; i++) {
+ fprintf(stderr, "\ncreating thread %d\n", i);
+ EXPECT_ZERO(pthread_create(&ti[i].thread, NULL,
+ testHdfsOperations, &ti[i]));
+ }
+ for (i = 0; i < tlhNumThreads; i++) {
+ EXPECT_ZERO(pthread_join(ti[i].thread, NULL));
+ }
+
+ EXPECT_ZERO(sem_close(tlhSem));
+ return checkFailures(ti, tlhNumThreads);
+}