You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/09/10 15:43:29 UTC
svn commit: r1382836 [1/3] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/
src/contrib/libwebhdfs/ src/contrib/libwebhdfs/resources/
src/contrib/libwebhdfs/src/
Author: szetszwo
Date: Mon Sep 10 13:43:28 2012
New Revision: 1382836
URL: http://svn.apache.org/viewvc?rev=1382836&view=rev
Log:
HDFS-2656. Add libwebhdfs, a pure C client based on WebHDFS. Contributed by Jaimin D Jetly and Jing Zhao
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/CMakeLists.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/resources/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/resources/FindJansson.cmake
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/exception.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/exception.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/expect.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_jni.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/jni_helper.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_multi_write.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_ops.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_read.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_threaded.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_libwebhdfs_write.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/test_read_bm.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/webhdfs.h
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/pom.xml
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1382836&r1=1382835&r2=1382836&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Sep 10 13:43:28 2012
@@ -221,6 +221,9 @@ Release 2.0.3-alpha - Unreleased
NEW FEATURES
+ HDFS-2656. Add libwebhdfs, a pure C client based on WebHDFS.
+ (Jaimin D Jetly and Jing Zhao via szetszwo)
+
IMPROVEMENTS
OPTIMIZATIONS
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/pom.xml?rev=1382836&r1=1382835&r2=1382836&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/pom.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/pom.xml Mon Sep 10 13:43:28 2012
@@ -35,6 +35,7 @@ http://maven.apache.org/xsd/maven-4.0.0.
<kdc.resource.dir>../../hadoop-common-project/hadoop-common/src/test/resources/kdc</kdc.resource.dir>
<is.hadoop.component>true</is.hadoop.component>
<require.fuse>false</require.fuse>
+ <require.libwebhdfs>false</require.libwebhdfs>
</properties>
<dependencies>
@@ -472,7 +473,7 @@ http://maven.apache.org/xsd/maven-4.0.0.
<mkdir dir="${project.build.directory}/native"/>
<exec executable="cmake" dir="${project.build.directory}/native"
failonerror="true">
- <arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DREQUIRE_FUSE=${require.fuse}"/>
+ <arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DREQUIRE_LIBWEBHDFS=${require.libwebhdfs} -DREQUIRE_FUSE=${require.fuse}"/>
</exec>
<exec executable="make" dir="${project.build.directory}/native" failonerror="true">
<arg line="VERBOSE=1"/>
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt?rev=1382836&r1=1382835&r2=1382836&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt Mon Sep 10 13:43:28 2012
@@ -147,4 +147,7 @@ target_link_libraries(test_libhdfs_threa
pthread
)
+IF(REQUIRE_LIBWEBHDFS)
+ add_subdirectory(contrib/libwebhdfs)
+ENDIF(REQUIRE_LIBWEBHDFS)
add_subdirectory(main/native/fuse-dfs)
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/CMakeLists.txt?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/CMakeLists.txt (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/CMakeLists.txt Mon Sep 10 13:43:28 2012
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+find_package(CURL)
+if (CURL_FOUND)
+ include_directories(${CURL_INCLUDE_DIRS})
+else (CURL_FOUND)
+ MESSAGE(STATUS "Failed to find CURL library.")
+endif (CURL_FOUND)
+
+set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH}
+"${CMAKE_SOURCE_DIR}/contrib/libwebhdfs/resources/")
+MESSAGE("CMAKE_MODULE_PATH IS: " ${CMAKE_MODULE_PATH})
+
+find_package(Jansson)
+include_directories(${JANSSON_INCLUDE_DIR})
+
+add_dual_library(webhdfs
+ src/exception.c
+ src/hdfs_web.c
+ src/hdfs_jni.c
+ src/jni_helper.c
+ src/hdfs_http_client.c
+ src/hdfs_http_query.c
+ src/hdfs_json_parser.c
+)
+target_link_dual_libraries(webhdfs
+ ${JAVA_JVM_LIBRARY}
+ ${CURL_LIBRARY}
+ ${JANSSON_LIBRARY}
+ pthread
+)
+dual_output_directory(webhdfs target)
+set(LIBWEBHDFS_VERSION "0.0.0")
+set_target_properties(webhdfs PROPERTIES
+ SOVERSION ${LIBWEBHDFS_VERSION})
+
+add_executable(test_libwebhdfs_ops
+ src/test_libwebhdfs_ops.c
+)
+target_link_libraries(test_libwebhdfs_ops
+ webhdfs
+ ${CURL_LIBRARY}
+ ${JAVA_JVM_LIBRARY}
+ ${JANSSON_LIBRARY}
+ pthread
+)
+
+add_executable(test_libwebhdfs_read
+ src/test_libwebhdfs_read.c
+)
+target_link_libraries(test_libwebhdfs_read
+ webhdfs
+ ${CURL_LIBRARY}
+ ${JAVA_JVM_LIBRARY}
+ ${JANSSON_LIBRARY}
+ pthread
+)
+
+add_executable(test_libwebhdfs_write
+ src/test_libwebhdfs_write.c
+)
+target_link_libraries(test_libwebhdfs_write
+ webhdfs
+ ${CURL_LIBRARY}
+ ${JAVA_JVM_LIBRARY}
+ ${JANSSON_LIBRARY}
+ pthread
+)
+
+add_executable(test_libwebhdfs_threaded
+ src/test_libwebhdfs_threaded.c
+)
+target_link_libraries(test_libwebhdfs_threaded
+ webhdfs
+ ${CURL_LIBRARY}
+ ${JAVA_JVM_LIBRARY}
+ ${JANSSON_LIBRARY}
+ pthread
+)
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/resources/FindJansson.cmake
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/resources/FindJansson.cmake?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/resources/FindJansson.cmake (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/resources/FindJansson.cmake Mon Sep 10 13:43:28 2012
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# - Try to find Jansson
+# Once done this will define
+# JANSSON_FOUND - System has Jansson
+# JANSSON_INCLUDE_DIRS - The Jansson include directories
+# JANSSON_LIBRARIES - The libraries needed to use Jansson
+# JANSSON_DEFINITIONS - Compiler switches required for using Jansson
+
+find_path(JANSSON_INCLUDE_DIR jansson.h
+ /usr/incluce
+ /usr/local/include )
+
+find_library(JANSSON_LIBRARY NAMES jansson
+ PATHS /usr/lib /usr/local/lib )
+
+set(JANSSON_LIBRARIES ${JANSSON_LIBRARY} )
+set(JANSSON_INCLUDE_DIRS ${JANSSON_INCLUDE_DIR} )
+
+include(FindPackageHandleStandardArgs)
+# handle the QUIETLY and REQUIRED arguments and set JANSSON_FOUND to TRUE
+# if all listed variables are TRUE
+find_package_handle_standard_args(Jansson DEFAULT_MSG
+ JANSSON_LIBRARY JANSSON_INCLUDE_DIR)
+
+mark_as_advanced(JANSSON_INCLUDE_DIR JANSSON_LIBRARY )
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/exception.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/exception.c?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/exception.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/exception.c Mon Sep 10 13:43:28 2012
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "exception.h"
+#include "webhdfs.h"
+#include "jni_helper.h"
+
+#include <inttypes.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define EXCEPTION_INFO_LEN (sizeof(gExceptionInfo)/sizeof(gExceptionInfo[0]))
+
+struct ExceptionInfo {
+ const char * const name;
+ int noPrintFlag;
+ int excErrno;
+};
+
+static const struct ExceptionInfo gExceptionInfo[] = {
+ {
+ .name = "java/io/FileNotFoundException",
+ .noPrintFlag = NOPRINT_EXC_FILE_NOT_FOUND,
+ .excErrno = ENOENT,
+ },
+ {
+ .name = "org/apache/hadoop/security/AccessControlException",
+ .noPrintFlag = NOPRINT_EXC_ACCESS_CONTROL,
+ .excErrno = EACCES,
+ },
+ {
+ .name = "org/apache/hadoop/fs/UnresolvedLinkException",
+ .noPrintFlag = NOPRINT_EXC_UNRESOLVED_LINK,
+ .excErrno = ENOLINK,
+ },
+ {
+ .name = "org/apache/hadoop/fs/ParentNotDirectoryException",
+ .noPrintFlag = NOPRINT_EXC_PARENT_NOT_DIRECTORY,
+ .excErrno = ENOTDIR,
+ },
+ {
+ .name = "java/lang/IllegalArgumentException",
+ .noPrintFlag = NOPRINT_EXC_ILLEGAL_ARGUMENT,
+ .excErrno = EINVAL,
+ },
+ {
+ .name = "java/lang/OutOfMemoryError",
+ .noPrintFlag = 0,
+ .excErrno = ENOMEM,
+ },
+
+};
+
+int printExceptionWebV(hdfs_exception_msg *exc, int noPrintFlags, const char *fmt, va_list ap)
+{
+ int i, noPrint, excErrno;
+ if (!exc) {
+ fprintf(stderr, "printExceptionWebV: the hdfs_exception_msg is NULL\n");
+ return EINTERNAL;
+ }
+
+ for (i = 0; i < EXCEPTION_INFO_LEN; i++) {
+ if (strstr(gExceptionInfo[i].name, exc->exception)) {
+ break;
+ }
+ }
+ if (i < EXCEPTION_INFO_LEN) {
+ noPrint = (gExceptionInfo[i].noPrintFlag & noPrintFlags);
+ excErrno = gExceptionInfo[i].excErrno;
+ } else {
+ noPrint = 0;
+ excErrno = EINTERNAL;
+ }
+
+ if (!noPrint) {
+ vfprintf(stderr, fmt, ap);
+ fprintf(stderr, " error:\n");
+ fprintf(stderr, "Exception: %s\nJavaClassName: %s\nMessage: %s\n", exc->exception, exc->javaClassName, exc->message);
+ }
+
+ free(exc);
+ return excErrno;
+}
+
+int printExceptionWeb(hdfs_exception_msg *exc, int noPrintFlags, const char *fmt, ...)
+{
+ va_list ap;
+ int ret;
+
+ va_start(ap, fmt);
+ ret = printExceptionWebV(exc, noPrintFlags, fmt, ap);
+ va_end(ap);
+ return ret;
+}
+
+int printExceptionAndFreeV(JNIEnv *env, jthrowable exc, int noPrintFlags,
+ const char *fmt, va_list ap)
+{
+ int i, noPrint, excErrno;
+ char *className = NULL;
+ jstring jStr = NULL;
+ jvalue jVal;
+ jthrowable jthr;
+
+ jthr = classNameOfObject(exc, env, &className);
+ if (jthr) {
+ fprintf(stderr, "PrintExceptionAndFree: error determining class name "
+ "of exception.\n");
+ className = strdup("(unknown)");
+ destroyLocalReference(env, jthr);
+ }
+ for (i = 0; i < EXCEPTION_INFO_LEN; i++) {
+ if (!strcmp(gExceptionInfo[i].name, className)) {
+ break;
+ }
+ }
+ if (i < EXCEPTION_INFO_LEN) {
+ noPrint = (gExceptionInfo[i].noPrintFlag & noPrintFlags);
+ excErrno = gExceptionInfo[i].excErrno;
+ } else {
+ noPrint = 0;
+ excErrno = EINTERNAL;
+ }
+ if (!noPrint) {
+ vfprintf(stderr, fmt, ap);
+ fprintf(stderr, " error:\n");
+
+ // We don't want to use ExceptionDescribe here, because that requires a
+ // pending exception. Instead, use ExceptionUtils.
+ jthr = invokeMethod(env, &jVal, STATIC, NULL,
+ "org/apache/commons/lang/exception/ExceptionUtils",
+ "getStackTrace", "(Ljava/lang/Throwable;)Ljava/lang/String;", exc);
+ if (jthr) {
+ fprintf(stderr, "(unable to get stack trace for %s exception: "
+ "ExceptionUtils::getStackTrace error.)\n", className);
+ destroyLocalReference(env, jthr);
+ } else {
+ jStr = jVal.l;
+ const char *stackTrace = (*env)->GetStringUTFChars(env, jStr, NULL);
+ if (!stackTrace) {
+ fprintf(stderr, "(unable to get stack trace for %s exception: "
+ "GetStringUTFChars error.)\n", className);
+ } else {
+ fprintf(stderr, "%s", stackTrace);
+ (*env)->ReleaseStringUTFChars(env, jStr, stackTrace);
+ }
+ }
+ }
+ destroyLocalReference(env, jStr);
+ destroyLocalReference(env, exc);
+ free(className);
+ return excErrno;
+}
+
+int printExceptionAndFree(JNIEnv *env, jthrowable exc, int noPrintFlags,
+ const char *fmt, ...)
+{
+ va_list ap;
+ int ret;
+
+ va_start(ap, fmt);
+ ret = printExceptionAndFreeV(env, exc, noPrintFlags, fmt, ap);
+ va_end(ap);
+ return ret;
+}
+
+int printPendingExceptionAndFree(JNIEnv *env, int noPrintFlags,
+ const char *fmt, ...)
+{
+ va_list ap;
+ int ret;
+ jthrowable exc;
+
+ exc = (*env)->ExceptionOccurred(env);
+ if (!exc) {
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, " error: (no exception)");
+ ret = 0;
+ } else {
+ (*env)->ExceptionClear(env);
+ va_start(ap, fmt);
+ ret = printExceptionAndFreeV(env, exc, noPrintFlags, fmt, ap);
+ va_end(ap);
+ }
+ return ret;
+}
+
+jthrowable getPendingExceptionAndClear(JNIEnv *env)
+{
+ jthrowable jthr = (*env)->ExceptionOccurred(env);
+ if (!jthr)
+ return NULL;
+ (*env)->ExceptionClear(env);
+ return jthr;
+}
+
+jthrowable newRuntimeError(JNIEnv *env, const char *fmt, ...)
+{
+ char buf[512];
+ jobject out, exc;
+ jstring jstr;
+ va_list ap;
+
+ va_start(ap, fmt);
+ vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+ jstr = (*env)->NewStringUTF(env, buf);
+ if (!jstr) {
+ // We got an out of memory exception rather than a RuntimeException.
+ // Too bad...
+ return getPendingExceptionAndClear(env);
+ }
+ exc = constructNewObjectOfClass(env, &out, "RuntimeException",
+ "(java/lang/String;)V", jstr);
+ (*env)->DeleteLocalRef(env, jstr);
+ // Again, we'll either get an out of memory exception or the
+ // RuntimeException we wanted.
+ return (exc) ? exc : out;
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/exception.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/exception.h?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/exception.h (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/exception.h Mon Sep 10 13:43:28 2012
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBHDFS_EXCEPTION_H
+#define LIBHDFS_EXCEPTION_H
+
+/**
+ * Exception handling routines for libhdfs.
+ *
+ * The convention we follow here is to clear pending exceptions as soon as they
+ * are raised. Never assume that the caller of your function will clean up
+ * after you-- do it yourself. Unhandled exceptions can lead to memory leaks
+ * and other undefined behavior.
+ *
+ * If you encounter an exception, return a local reference to it. The caller is
+ * responsible for freeing the local reference, by calling a function like
+ * PrintExceptionAndFree. (You can also free exceptions directly by calling
+ * DeleteLocalRef. However, that would not produce an error message, so it's
+ * usually not what you want.)
+ */
+
+#include <jni.h>
+#include <stdio.h>
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <search.h>
+#include <pthread.h>
+#include <errno.h>
+
+/**
+ * Exception noprint flags
+ *
+ * Theses flags determine which exceptions should NOT be printed to stderr by
+ * the exception printing routines. For example, if you expect to see
+ * FileNotFound, you might use NOPRINT_EXC_FILE_NOT_FOUND, to avoid filling the
+ * logs with messages about routine events.
+ *
+ * On the other hand, if you don't expect any failures, you might pass
+ * PRINT_EXC_ALL.
+ *
+ * You can OR these flags together to avoid printing multiple classes of
+ * exceptions.
+ */
+#define PRINT_EXC_ALL 0x00
+#define NOPRINT_EXC_FILE_NOT_FOUND 0x01
+#define NOPRINT_EXC_ACCESS_CONTROL 0x02
+#define NOPRINT_EXC_UNRESOLVED_LINK 0x04
+#define NOPRINT_EXC_PARENT_NOT_DIRECTORY 0x08
+#define NOPRINT_EXC_ILLEGAL_ARGUMENT 0x10
+
+/**
+ * Exception information after calling webhdfs operations
+ */
+typedef struct {
+ const char *exception;
+ const char *javaClassName;
+ const char *message;
+} hdfs_exception_msg;
+
+/**
+ * Print out exception information got after calling webhdfs operations
+ *
+ * @param exc The exception information to print and free
+ * @param noPrintFlags Flags which determine which exceptions we should NOT
+ * print.
+ * @param fmt Printf-style format list
+ * @param ap Printf-style varargs
+ *
+ * @return The POSIX error number associated with the exception
+ * object.
+ */
+int printExceptionWebV(hdfs_exception_msg *exc, int noPrintFlags, const char *fmt, va_list ap);
+
+/**
+ * Print out exception information got after calling webhdfs operations
+ *
+ * @param exc The exception information to print and free
+ * @param noPrintFlags Flags which determine which exceptions we should NOT
+ * print.
+ * @param fmt Printf-style format list
+ * @param ... Printf-style varargs
+ *
+ * @return The POSIX error number associated with the exception
+ * object.
+ */
+int printExceptionWeb(hdfs_exception_msg *exc, int noPrintFlags,
+ const char *fmt, ...) __attribute__((format(printf, 3, 4)));
+
+/**
+ * Print out information about an exception and free it.
+ *
+ * @param env The JNI environment
+ * @param exc The exception to print and free
+ * @param noPrintFlags Flags which determine which exceptions we should NOT
+ * print.
+ * @param fmt Printf-style format list
+ * @param ap Printf-style varargs
+ *
+ * @return The POSIX error number associated with the exception
+ * object.
+ */
+int printExceptionAndFreeV(JNIEnv *env, jthrowable exc, int noPrintFlags,
+ const char *fmt, va_list ap);
+
+/**
+ * Print out information about an exception and free it.
+ *
+ * @param env The JNI environment
+ * @param exc The exception to print and free
+ * @param noPrintFlags Flags which determine which exceptions we should NOT
+ * print.
+ * @param fmt Printf-style format list
+ * @param ... Printf-style varargs
+ *
+ * @return The POSIX error number associated with the exception
+ * object.
+ */
+int printExceptionAndFree(JNIEnv *env, jthrowable exc, int noPrintFlags,
+ const char *fmt, ...) __attribute__((format(printf, 4, 5)));
+
+/**
+ * Print out information about the pending exception and free it.
+ *
+ * @param env The JNI environment
+ * @param noPrintFlags Flags which determine which exceptions we should NOT
+ * print.
+ * @param fmt Printf-style format list
+ * @param ... Printf-style varargs
+ *
+ * @return The POSIX error number associated with the exception
+ * object.
+ */
+int printPendingExceptionAndFree(JNIEnv *env, int noPrintFlags,
+ const char *fmt, ...) __attribute__((format(printf, 3, 4)));
+
+/**
+ * Get a local reference to the pending exception and clear it.
+ *
+ * Once it is cleared, the exception will no longer be pending. The caller will
+ * have to decide what to do with the exception object.
+ *
+ * @param env The JNI environment
+ *
+ * @return The exception, or NULL if there was no exception
+ */
+jthrowable getPendingExceptionAndClear(JNIEnv *env);
+
+/**
+ * Create a new runtime error.
+ *
+ * This creates (but does not throw) a new RuntimeError.
+ *
+ * @param env The JNI environment
+ * @param fmt Printf-style format list
+ * @param ... Printf-style varargs
+ *
+ * @return A local reference to a RuntimeError
+ */
+jthrowable newRuntimeError(JNIEnv *env, const char *fmt, ...)
+ __attribute__((format(printf, 2, 3)));
+
+#endif
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/expect.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/expect.h?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/expect.h (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/expect.h Mon Sep 10 13:43:28 2012
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBHDFS_NATIVE_TESTS_EXPECT_H
+#define LIBHDFS_NATIVE_TESTS_EXPECT_H
+
+#include <stdio.h>
+
+#define EXPECT_ZERO(x) \
+ do { \
+ int __my_ret__ = x; \
+ if (__my_ret__) { \
+ int __my_errno__ = errno; \
+ fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+ "code %d (errno: %d): got nonzero from %s\n", \
+ __LINE__, __my_ret__, __my_errno__, #x); \
+ return __my_ret__; \
+ } \
+ } while (0);
+
+#define EXPECT_NULL(x) \
+ do { \
+ void* __my_ret__ = x; \
+ int __my_errno__ = errno; \
+ if (__my_ret__ != NULL) { \
+ fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
+ "got non-NULL value %p from %s\n", \
+ __LINE__, __my_errno__, __my_ret__, #x); \
+ return -1; \
+ } \
+ } while (0);
+
+#define EXPECT_NONNULL(x) \
+ do { \
+ void* __my_ret__ = x; \
+ int __my_errno__ = errno; \
+ if (__my_ret__ == NULL) { \
+ fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
+ "got NULL from %s\n", __LINE__, __my_errno__, #x); \
+ return -1; \
+ } \
+ } while (0);
+
+#define EXPECT_NEGATIVE_ONE_WITH_ERRNO(x, e) \
+ do { \
+ int __my_ret__ = x; \
+ int __my_errno__ = errno; \
+ if (__my_ret__ != -1) { \
+ fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+ "code %d (errno: %d): expected -1 from %s\n", __LINE__, \
+ __my_ret__, __my_errno__, #x); \
+ return -1; \
+ } \
+ if (__my_errno__ != e) { \
+ fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+ "code %d (errno: %d): expected errno = %d from %s\n", \
+ __LINE__, __my_ret__, __my_errno__, e, #x); \
+ return -1; \
+ } \
+ } while (0);
+
+#define EXPECT_NONZERO(x) \
+ do { \
+ int __my_ret__ = x; \
+ int __my_errno__ = errno; \
+ if (__my_ret__) { \
+ fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+ "code %d (errno: %d): got zero from %s\n", __LINE__, \
+ __my_ret__, __my_errno__, #x); \
+ return -1; \
+ } \
+ } while (0);
+
+#define EXPECT_NONNEGATIVE(x) \
+ do { \
+ int __my_ret__ = x; \
+ int __my_errno__ = errno; \
+ if (__my_ret__ < 0) { \
+ fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+ "code %d (errno: %d): got negative return from %s\n", \
+ __LINE__, __my_ret__, __my_errno__, #x); \
+ return __my_ret__; \
+ } \
+ } while (0);
+
+#endif
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.c Mon Sep 10 13:43:28 2012
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdlib.h>
+#include <string.h>
+#include <curl/curl.h>
+#include <pthread.h>
+#include "hdfs_http_client.h"
+
+static pthread_mutex_t curlInitMutex = PTHREAD_MUTEX_INITIALIZER;
+static volatile int curlGlobalInited = 0;
+
+ResponseBuffer initResponseBuffer() {
+ ResponseBuffer info = (ResponseBuffer) calloc(1, sizeof(ResponseBufferInternal));
+ if (!info) {
+ fprintf(stderr, "Cannot allocate memory for responseInfo\n");
+ return NULL;
+ }
+ info->remaining = 0;
+ info->offset = 0;
+ info->content = NULL;
+ return info;
+}
+
+void freeResponseBuffer(ResponseBuffer buffer) {
+ if (buffer) {
+ if (buffer->content) {
+ free(buffer->content);
+ }
+ free(buffer);
+ buffer = NULL;
+ }
+}
+
+void freeResponse(Response resp) {
+ if(resp) {
+ freeResponseBuffer(resp->body);
+ freeResponseBuffer(resp->header);
+ free(resp);
+ resp = NULL;
+ }
+}
+
+/* Callback for allocating local buffer and reading data to local buffer */
+static size_t writefunc(void *ptr, size_t size, size_t nmemb, ResponseBuffer rbuffer) {
+ if (size * nmemb < 1) {
+ return 0;
+ }
+ if (!rbuffer) {
+ fprintf(stderr, "In writefunc, ResponseBuffer is NULL.\n");
+ return -1;
+ }
+
+ if (rbuffer->remaining < size * nmemb) {
+ rbuffer->content = realloc(rbuffer->content, rbuffer->offset + size * nmemb + 1);
+ if (rbuffer->content == NULL) {
+ return -1;
+ }
+ rbuffer->remaining = size * nmemb;
+ }
+ memcpy(rbuffer->content + rbuffer->offset, ptr, size * nmemb);
+ rbuffer->offset += size * nmemb;
+ (rbuffer->content)[rbuffer->offset] = '\0';
+ rbuffer->remaining -= size * nmemb;
+ return size * nmemb;
+}
+
+/**
+ * Callback for reading data to buffer provided by user,
+ * thus no need to reallocate buffer.
+ */
+static size_t writefunc_withbuffer(void *ptr, size_t size, size_t nmemb, ResponseBuffer rbuffer) {
+ if (size * nmemb < 1) {
+ return 0;
+ }
+ if (!rbuffer || !rbuffer->content) {
+ fprintf(stderr, "In writefunc_withbuffer, the buffer provided by user is NULL.\n");
+ return 0;
+ }
+
+ size_t toCopy = rbuffer->remaining < (size * nmemb) ? rbuffer->remaining : (size * nmemb);
+ memcpy(rbuffer->content + rbuffer->offset, ptr, toCopy);
+ rbuffer->offset += toCopy;
+ rbuffer->remaining -= toCopy;
+ return toCopy;
+}
+
+//callback for writing data to remote peer
+static size_t readfunc(void *ptr, size_t size, size_t nmemb, void *stream) {
+ if (size * nmemb < 1) {
+ fprintf(stderr, "In readfunc callback: size * nmemb == %ld\n", size * nmemb);
+ return 0;
+ }
+ webhdfsBuffer *wbuffer = (webhdfsBuffer *) stream;
+
+ pthread_mutex_lock(&wbuffer->writeMutex);
+ while (wbuffer->remaining == 0) {
+ /*
+ * the current remainning bytes to write is 0,
+ * check whether need to finish the transfer
+ * if yes, return 0; else, wait
+ */
+ if (wbuffer->closeFlag) {
+ //we can close the transfer now
+ fprintf(stderr, "CloseFlag is set, ready to close the transfer\n");
+ pthread_mutex_unlock(&wbuffer->writeMutex);
+ return 0;
+ } else {
+ // len == 0 indicates that user's buffer has been transferred
+ pthread_cond_signal(&wbuffer->transfer_finish);
+ pthread_cond_wait(&wbuffer->newwrite_or_close, &wbuffer->writeMutex);
+ }
+ }
+
+ if(wbuffer->remaining > 0 && !wbuffer->closeFlag) {
+ size_t copySize = wbuffer->remaining < size * nmemb ? wbuffer->remaining : size * nmemb;
+ memcpy(ptr, wbuffer->wbuffer + wbuffer->offset, copySize);
+ wbuffer->offset += copySize;
+ wbuffer->remaining -= copySize;
+ pthread_mutex_unlock(&wbuffer->writeMutex);
+ return copySize;
+ } else {
+ fprintf(stderr, "Webhdfs buffer is %ld, it should be a positive value!\n", wbuffer->remaining);
+ pthread_mutex_unlock(&wbuffer->writeMutex);
+ return 0;
+ }
+}
+
+static void initCurlGlobal() {
+ if (!curlGlobalInited) {
+ pthread_mutex_lock(&curlInitMutex);
+ if (!curlGlobalInited) {
+ curl_global_init(CURL_GLOBAL_ALL);
+ curlGlobalInited = 1;
+ }
+ pthread_mutex_unlock(&curlInitMutex);
+ }
+}
+
+static Response launchCmd(char *url, enum HttpHeader method, enum Redirect followloc) {
+ CURL *curl;
+ CURLcode res;
+ Response resp;
+
+ resp = (Response) calloc(1, sizeof(*resp));
+ if (!resp) {
+ return NULL;
+ }
+ resp->body = initResponseBuffer();
+ resp->header = initResponseBuffer();
+ initCurlGlobal();
+ curl = curl_easy_init(); /* get a curl handle */
+ if(curl) {
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
+ curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
+ curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
+ curl_easy_setopt(curl, CURLOPT_URL, url); /* specify target URL */
+ switch(method) {
+ case GET:
+ break;
+ case PUT:
+ curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"PUT");
+ break;
+ case POST:
+ curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"POST");
+ break;
+ case DELETE:
+ curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE");
+ break;
+ default:
+ fprintf(stderr, "\nHTTP method not defined\n");
+ exit(EXIT_FAILURE);
+ }
+ if(followloc == YES) {
+ curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
+ }
+
+ res = curl_easy_perform(curl); /* Now run the curl handler */
+ if(res != CURLE_OK) {
+ fprintf(stderr, "preform the URL %s failed\n", url);
+ return NULL;
+ }
+ curl_easy_cleanup(curl);
+ }
+ return resp;
+}
+
+static Response launchRead_internal(char *url, enum HttpHeader method, enum Redirect followloc, Response resp) {
+ if (!resp || !resp->body || !resp->body->content) {
+ fprintf(stderr, "The user provided buffer should not be NULL!\n");
+ return NULL;
+ }
+
+ CURL *curl;
+ CURLcode res;
+ initCurlGlobal();
+ curl = curl_easy_init(); /* get a curl handle */
+ if(curl) {
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc_withbuffer);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, resp->body);
+ curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
+ curl_easy_setopt(curl, CURLOPT_WRITEHEADER, resp->header);
+ curl_easy_setopt(curl, CURLOPT_URL, url); /* specify target URL */
+ if(followloc == YES) {
+ curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
+ }
+
+ res = curl_easy_perform(curl); /* Now run the curl handler */
+ if(res != CURLE_OK && res != CURLE_PARTIAL_FILE) {
+ fprintf(stderr, "preform the URL %s failed\n", url);
+ return NULL;
+ }
+ curl_easy_cleanup(curl);
+ }
+ return resp;
+
+}
+
+static Response launchWrite(const char *url, enum HttpHeader method, webhdfsBuffer *uploadBuffer) {
+ if (!uploadBuffer) {
+ fprintf(stderr, "upload buffer is NULL!\n");
+ errno = EINVAL;
+ return NULL;
+ }
+ initCurlGlobal();
+ CURLcode res;
+ Response response = (Response) calloc(1, sizeof(*response));
+ if (!response) {
+ fprintf(stderr, "failed to allocate memory for response\n");
+ return NULL;
+ }
+ response->body = initResponseBuffer();
+ response->header = initResponseBuffer();
+
+ //connect to the datanode in order to create the lease in the namenode
+ CURL *curl = curl_easy_init();
+ if (!curl) {
+ fprintf(stderr, "Failed to initialize the curl handle.\n");
+ return NULL;
+ }
+ curl_easy_setopt(curl, CURLOPT_URL, url);
+
+ if(curl) {
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writefunc);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, response->body);
+ curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writefunc);
+ curl_easy_setopt(curl, CURLOPT_WRITEHEADER, response->header);
+ curl_easy_setopt(curl, CURLOPT_READFUNCTION, readfunc);
+ curl_easy_setopt(curl, CURLOPT_READDATA, uploadBuffer);
+ curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
+ curl_easy_setopt(curl, CURLOPT_VERBOSE, 1);
+
+ struct curl_slist *chunk = NULL;
+ chunk = curl_slist_append(chunk, "Transfer-Encoding: chunked");
+ res = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
+ chunk = curl_slist_append(chunk, "Expect:");
+ res = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
+
+ switch(method) {
+ case GET:
+ break;
+ case PUT:
+ curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"PUT");
+ break;
+ case POST:
+ curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"POST");
+ break;
+ case DELETE:
+ curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE");
+ break;
+ default:
+ fprintf(stderr, "\nHTTP method not defined\n");
+ exit(EXIT_FAILURE);
+ }
+ res = curl_easy_perform(curl);
+ curl_slist_free_all(chunk);
+ curl_easy_cleanup(curl);
+ }
+
+ return response;
+}
+
+Response launchMKDIR(char *url) {
+ return launchCmd(url, PUT, NO);
+}
+
+Response launchRENAME(char *url) {
+ return launchCmd(url, PUT, NO);
+}
+
+Response launchGFS(char *url) {
+ return launchCmd(url, GET, NO);
+}
+
+Response launchLS(char *url) {
+ return launchCmd(url, GET, NO);
+}
+
+Response launchCHMOD(char *url) {
+ return launchCmd(url, PUT, NO);
+}
+
+Response launchCHOWN(char *url) {
+ return launchCmd(url, PUT, NO);
+}
+
+Response launchDELETE(char *url) {
+ return launchCmd(url, DELETE, NO);
+}
+
+Response launchOPEN(char *url, Response resp) {
+ return launchRead_internal(url, GET, YES, resp);
+}
+
+Response launchUTIMES(char *url) {
+ return launchCmd(url, PUT, NO);
+}
+
+Response launchNnWRITE(char *url) {
+ return launchCmd(url, PUT, NO);
+}
+
+Response launchNnAPPEND(char *url) {
+ return launchCmd(url, POST, NO);
+}
+
+Response launchDnWRITE(const char *url, webhdfsBuffer *buffer) {
+ return launchWrite(url, PUT, buffer);
+}
+
+Response launchDnAPPEND(const char *url, webhdfsBuffer *buffer) {
+ return launchWrite(url, POST, buffer);
+}
+
+Response launchSETREPLICATION(char *url) {
+ return launchCmd(url, PUT, NO);
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_client.h Mon Sep 10 13:43:28 2012
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+#ifndef _HDFS_HTTP_CLIENT_H_
+#define _HDFS_HTTP_CLIENT_H_
+
+#include "webhdfs.h"
+#include <curl/curl.h>
+
+enum HttpHeader {
+ GET,
+ PUT,
+ POST,
+ DELETE
+};
+
+enum Redirect {
+ YES,
+ NO
+};
+
+typedef struct {
+ char *content;
+ size_t remaining;
+ size_t offset;
+} ResponseBufferInternal;
+typedef ResponseBufferInternal *ResponseBuffer;
+
+/**
+ * The response got through webhdfs
+ */
+typedef struct {
+ ResponseBuffer body;
+ ResponseBuffer header;
+}* Response;
+
+ResponseBuffer initResponseBuffer();
+void freeResponseBuffer(ResponseBuffer buffer);
+void freeResponse(Response resp);
+
+Response launchMKDIR(char *url);
+Response launchRENAME(char *url);
+Response launchCHMOD(char *url);
+Response launchGFS(char *url);
+Response launchLS(char *url);
+Response launchDELETE(char *url);
+Response launchCHOWN(char *url);
+Response launchOPEN(char *url, Response resp);
+Response launchUTIMES(char *url);
+Response launchNnWRITE(char *url);
+
+Response launchDnWRITE(const char *url, webhdfsBuffer *buffer);
+Response launchNnAPPEND(char *url);
+Response launchSETREPLICATION(char *url);
+Response launchDnAPPEND(const char *url, webhdfsBuffer *buffer);
+
+#endif //_HDFS_HTTP_CLIENT_H_
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.c Mon Sep 10 13:43:28 2012
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "hdfs_http_query.h"
+#include <math.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+
+#define NUM_OF_PERMISSION_BITS 4
+#define NUM_OF_PORT_BITS 6
+#define NUM_OF_REPLICATION_BITS 6
+
+static char *prepareQUERY(const char *host, int nnPort, const char *srcpath, const char *OP, const char *user) {
+ size_t length;
+ char *url;
+ const char *const protocol = "http://";
+ const char *const prefix = "/webhdfs/v1";
+ char *temp;
+ char *port;
+ port= (char*) malloc(NUM_OF_PORT_BITS);
+ if (!port) {
+ return NULL;
+ }
+ sprintf(port,"%d",nnPort);
+ if (user != NULL) {
+ length = strlen(protocol) + strlen(host) + strlen(":") + strlen(port) + strlen(prefix) + strlen(srcpath) + strlen ("?op=") + strlen(OP) + strlen("&user.name=") + strlen(user);
+ } else {
+ length = strlen(protocol) + strlen(host) + strlen(":") + strlen(port) + strlen(prefix) + strlen(srcpath) + strlen ("?op=") + strlen(OP);
+ }
+
+ temp = (char*) malloc(length + 1);
+ if (!temp) {
+ return NULL;
+ }
+ strcpy(temp,protocol);
+ temp = strcat(temp,host);
+ temp = strcat(temp,":");
+ temp = strcat(temp,port);
+ temp = strcat(temp,prefix);
+ temp = strcat(temp,srcpath);
+ temp = strcat(temp,"?op=");
+ temp = strcat(temp,OP);
+ if (user) {
+ temp = strcat(temp,"&user.name=");
+ temp = strcat(temp,user);
+ }
+ url = temp;
+ return url;
+}
+
+
+static int decToOctal(int decNo) {
+ int octNo=0;
+ int expo =0;
+ while (decNo != 0) {
+ octNo = ((decNo % 8) * pow(10,expo)) + octNo;
+ decNo = decNo / 8;
+ expo++;
+ }
+ return octNo;
+}
+
+
+char *prepareMKDIR(const char *host, int nnPort, const char *dirsubpath, const char *user) {
+ return prepareQUERY(host, nnPort, dirsubpath, "MKDIRS", user);
+}
+
+
+char *prepareMKDIRwithMode(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user) {
+ char *url;
+ char *permission;
+ permission = (char*) malloc(NUM_OF_PERMISSION_BITS);
+ if (!permission) {
+ return NULL;
+ }
+ mode = decToOctal(mode);
+ sprintf(permission,"%d",mode);
+ url = prepareMKDIR(host, nnPort, dirsubpath, user);
+ url = realloc(url,(strlen(url) + strlen("&permission=") + strlen(permission) + 1));
+ if (!url) {
+ return NULL;
+ }
+ url = strcat(url,"&permission=");
+ url = strcat(url,permission);
+ return url;
+}
+
+
+char *prepareRENAME(const char *host, int nnPort, const char *srcpath, const char *destpath, const char *user) {
+ char *url;
+ url = prepareQUERY(host, nnPort, srcpath, "RENAME", user);
+ url = realloc(url,(strlen(url) + strlen("&destination=") + strlen(destpath) + 1));
+ if (!url) {
+ return NULL;
+ }
+ url = strcat(url,"&destination=");
+ url = strcat(url,destpath);
+ return url;
+}
+
+char *prepareGFS(const char *host, int nnPort, const char *dirsubpath, const char *user) {
+ return (prepareQUERY(host, nnPort, dirsubpath, "GETFILESTATUS", user));
+}
+
+char *prepareLS(const char *host, int nnPort, const char *dirsubpath, const char *user) {
+ return (prepareQUERY(host, nnPort, dirsubpath, "LISTSTATUS", user));
+}
+
+char *prepareCHMOD(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user) {
+ char *url;
+ char *permission;
+ permission = (char*) malloc(NUM_OF_PERMISSION_BITS);
+ if (!permission) {
+ return NULL;
+ }
+ mode &= 0x3FFF;
+ mode = decToOctal(mode);
+ sprintf(permission,"%d",mode);
+ url = prepareQUERY(host, nnPort, dirsubpath, "SETPERMISSION", user);
+ url = realloc(url,(strlen(url) + strlen("&permission=") + strlen(permission) + 1));
+ if (!url) {
+ return NULL;
+ }
+ url = strcat(url,"&permission=");
+ url = strcat(url,permission);
+ return url;
+}
+
+char *prepareDELETE(const char *host, int nnPort, const char *dirsubpath, int recursive, const char *user) {
+ char *url = (prepareQUERY(host, nnPort, dirsubpath, "DELETE", user));
+ char *recursiveFlag = (char *)malloc(6);
+ if (!recursive) {
+ strcpy(recursiveFlag, "false");
+ } else {
+ strcpy(recursiveFlag, "true");
+ }
+ url = (char *) realloc(url, strlen(url) + strlen("&recursive=") + strlen(recursiveFlag) + 1);
+ if (!url) {
+ return NULL;
+ }
+
+ strcat(url, "&recursive=");
+ strcat(url, recursiveFlag);
+ return url;
+}
+
+char *prepareCHOWN(const char *host, int nnPort, const char *dirsubpath, const char *owner, const char *group, const char *user) {
+ char *url;
+ url = prepareQUERY(host, nnPort, dirsubpath, "SETOWNER", user);
+ if (!url) {
+ return NULL;
+ }
+ if(owner != NULL) {
+ url = realloc(url,(strlen(url) + strlen("&owner=") + strlen(owner) + 1));
+ url = strcat(url,"&owner=");
+ url = strcat(url,owner);
+ }
+ if (group != NULL) {
+ url = realloc(url,(strlen(url) + strlen("&group=") + strlen(group) + 1));
+ url = strcat(url,"&group=");
+ url = strcat(url,group);
+ }
+ return url;
+}
+
+char *prepareOPEN(const char *host, int nnPort, const char *dirsubpath, const char *user, size_t offset, size_t length) {
+ char *base_url = prepareQUERY(host, nnPort, dirsubpath, "OPEN", user);
+ char *url = (char *) malloc(strlen(base_url) + strlen("&offset=") + 15 + strlen("&length=") + 15);
+ if (!url) {
+ return NULL;
+ }
+ sprintf(url, "%s&offset=%ld&length=%ld", base_url, offset, length);
+ return url;
+}
+
+char *prepareUTIMES(const char *host, int nnPort, const char *dirsubpath, long unsigned mTime, long unsigned aTime, const char *user) {
+ char *url;
+ char *modTime;
+ char *acsTime;
+ modTime = (char*) malloc(12);
+ acsTime = (char*) malloc(12);
+ url = prepareQUERY(host, nnPort, dirsubpath, "SETTIMES", user);
+ sprintf(modTime,"%lu",mTime);
+ sprintf(acsTime,"%lu",aTime);
+ url = realloc(url,(strlen(url) + strlen("&modificationtime=") + strlen(modTime) + strlen("&accesstime=") + strlen(acsTime) + 1));
+ if (!url) {
+ return NULL;
+ }
+ url = strcat(url, "&modificationtime=");
+ url = strcat(url, modTime);
+ url = strcat(url,"&accesstime=");
+ url = strcat(url, acsTime);
+ return url;
+}
+
+char *prepareNnWRITE(const char *host, int nnPort, const char *dirsubpath, const char *user, int16_t replication, size_t blockSize) {
+ char *url;
+ url = prepareQUERY(host, nnPort, dirsubpath, "CREATE", user);
+ url = realloc(url, (strlen(url) + strlen("&overwrite=true") + 1));
+ if (!url) {
+ return NULL;
+ }
+ url = strcat(url, "&overwrite=true");
+ if (replication > 0) {
+ url = realloc(url, (strlen(url) + strlen("&replication=") + 6));
+ if (!url) {
+ return NULL;
+ }
+ sprintf(url, "%s&replication=%d", url, replication);
+ }
+ if (blockSize > 0) {
+ url = realloc(url, (strlen(url) + strlen("&blocksize=") + 16));
+ if (!url) {
+ return NULL;
+ }
+ sprintf(url, "%s&blocksize=%ld", url, blockSize);
+ }
+ return url;
+}
+
+char *prepareNnAPPEND(const char *host, int nnPort, const char *dirsubpath, const char *user) {
+ return (prepareQUERY(host, nnPort, dirsubpath, "APPEND", user));
+}
+
+char *prepareSETREPLICATION(const char *host, int nnPort, const char *path, int16_t replication, const char *user)
+{
+ char *url = prepareQUERY(host, nnPort, path, "SETREPLICATION", user);
+ char *replicationNum = (char *) malloc(NUM_OF_REPLICATION_BITS);
+ sprintf(replicationNum, "%u", replication);
+ url = realloc(url, strlen(url) + strlen("&replication=") + strlen(replicationNum)+ 1);
+ if (!url) {
+ return NULL;
+ }
+
+ url = strcat(url, "&replication=");
+ url = strcat(url, replicationNum);
+ return url;
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_http_query.h Mon Sep 10 13:43:28 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef _HDFS_HTTP_QUERY_H_
+#define _HDFS_HTTP_QUERY_H_
+
+#include <stdint.h>
+#include <stdio.h>
+
+char *prepareMKDIR(const char *host, int nnPort, const char *dirsubpath, const char *user);
+char *prepareMKDIRwithMode(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user);
+char *prepareRENAME(const char *host, int nnPort, const char *srcpath, const char *destpath, const char *user);
+char *prepareCHMOD(const char *host, int nnPort, const char *dirsubpath, int mode, const char *user);
+char *prepareGFS(const char *host, int nnPort, const char *dirsubpath, const char *user);
+char *prepareLS(const char *host, int nnPort, const char *dirsubpath, const char *user);
+char *prepareDELETE(const char *host, int nnPort, const char *dirsubpath, int recursive, const char *user);
+char *prepareCHOWN(const char *host, int nnPort, const char *dirsubpath, const char *owner, const char *group, const char *user);
+char *prepareOPEN(const char *host, int nnPort, const char *dirsubpath, const char *user, size_t offset, size_t length);
+char *prepareUTIMES(const char *host, int nnPort, const char *dirsubpath, long unsigned mTime, long unsigned aTime, const char *user);
+char *prepareNnWRITE(const char *host, int nnPort, const char *dirsubpath, const char *user, int16_t replication, size_t blockSize);
+char *prepareNnAPPEND(const char *host, int nnPort, const char *dirsubpath, const char *user);
+char *prepareSETREPLICATION(const char *host, int nnPort, const char *path, int16_t replication, const char *user);
+
+
+#endif //_HDFS_HTTP_QUERY_H_
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_jni.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_jni.c?rev=1382836&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_jni.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_jni.c Mon Sep 10 13:43:28 2012
@@ -0,0 +1,616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include "webhdfs.h"
+#include "jni_helper.h"
+#include "exception.h"
+
+/* Some frequently used Java paths */
+#define HADOOP_CONF "org/apache/hadoop/conf/Configuration"
+#define HADOOP_PATH "org/apache/hadoop/fs/Path"
+#define HADOOP_LOCALFS "org/apache/hadoop/fs/LocalFileSystem"
+#define HADOOP_FS "org/apache/hadoop/fs/FileSystem"
+#define HADOOP_FSSTATUS "org/apache/hadoop/fs/FsStatus"
+#define HADOOP_BLK_LOC "org/apache/hadoop/fs/BlockLocation"
+#define HADOOP_DFS "org/apache/hadoop/hdfs/DistributedFileSystem"
+#define HADOOP_ISTRM "org/apache/hadoop/fs/FSDataInputStream"
+#define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream"
+#define HADOOP_STAT "org/apache/hadoop/fs/FileStatus"
+#define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission"
+#define JAVA_NET_ISA "java/net/InetSocketAddress"
+#define JAVA_NET_URI "java/net/URI"
+#define JAVA_STRING "java/lang/String"
+
+#define JAVA_VOID "V"
+
+/* Macros for constructing method signatures */
+#define JPARAM(X) "L" X ";"
+#define JARRPARAM(X) "[L" X ";"
+#define JMETHOD1(X, R) "(" X ")" R
+#define JMETHOD2(X, Y, R) "(" X Y ")" R
+#define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R
+
+#define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path"
+
+/**
+ * Helper function to create a org.apache.hadoop.fs.Path object.
+ * @param env: The JNIEnv pointer.
+ * @param path: The file-path for which to construct org.apache.hadoop.fs.Path
+ * object.
+ * @return Returns a jobject on success and NULL on error.
+ */
+static jthrowable constructNewObjectOfPath(JNIEnv *env, const char *path,
+ jobject *out)
+{
+ jthrowable jthr;
+ jstring jPathString;
+ jobject jPath;
+
+ //Construct a java.lang.String object
+ jthr = newJavaStr(env, path, &jPathString);
+ if (jthr)
+ return jthr;
+ //Construct the org.apache.hadoop.fs.Path object
+ jthr = constructNewObjectOfClass(env, &jPath, "org/apache/hadoop/fs/Path",
+ "(Ljava/lang/String;)V", jPathString);
+ destroyLocalReference(env, jPathString);
+ if (jthr)
+ return jthr;
+ *out = jPath;
+ return NULL;
+}
+
+/**
+ * Set a configuration value.
+ *
+ * @param env The JNI environment
+ * @param jConfiguration The configuration object to modify
+ * @param key The key to modify
+ * @param value The value to set the key to
+ *
+ * @return NULL on success; exception otherwise
+ */
+static jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
+ const char *key, const char *value)
+{
+ jthrowable jthr;
+ jstring jkey = NULL, jvalue = NULL;
+
+ jthr = newJavaStr(env, key, &jkey);
+ if (jthr)
+ goto done;
+ jthr = newJavaStr(env, value, &jvalue);
+ if (jthr)
+ goto done;
+ jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
+ HADOOP_CONF, "set", JMETHOD2(JPARAM(JAVA_STRING),
+ JPARAM(JAVA_STRING), JAVA_VOID),
+ jkey, jvalue);
+ if (jthr)
+ goto done;
+done:
+ destroyLocalReference(env, jkey);
+ destroyLocalReference(env, jvalue);
+ return jthr;
+}
+
+static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
+ const char *key, char **val)
+{
+ jthrowable jthr;
+ jvalue jVal;
+ jstring jkey = NULL, jRet = NULL;
+
+ jthr = newJavaStr(env, key, &jkey);
+ if (jthr)
+ goto done;
+ jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
+ HADOOP_CONF, "get", JMETHOD1(JPARAM(JAVA_STRING),
+ JPARAM(JAVA_STRING)), jkey);
+ if (jthr)
+ goto done;
+ jRet = jVal.l;
+ jthr = newCStr(env, jRet, val);
+done:
+ destroyLocalReference(env, jkey);
+ destroyLocalReference(env, jRet);
+ return jthr;
+}
+
+int hdfsConfGetStr(const char *key, char **val)
+{
+ JNIEnv *env;
+ int ret;
+ jthrowable jthr;
+ jobject jConfiguration = NULL;
+
+ env = getJNIEnv();
+ if (env == NULL) {
+ ret = EINTERNAL;
+ goto done;
+ }
+ jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsConfGetStr(%s): new Configuration", key);
+ goto done;
+ }
+ jthr = hadoopConfGetStr(env, jConfiguration, key, val);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsConfGetStr(%s): hadoopConfGetStr", key);
+ goto done;
+ }
+ ret = 0;
+done:
+ destroyLocalReference(env, jConfiguration);
+ if (ret)
+ errno = ret;
+ return ret;
+}
+
+void hdfsConfStrFree(char *val)
+{
+ free(val);
+}
+
+static jthrowable hadoopConfGetInt(JNIEnv *env, jobject jConfiguration,
+ const char *key, int32_t *val)
+{
+ jthrowable jthr = NULL;
+ jvalue jVal;
+ jstring jkey = NULL;
+
+ jthr = newJavaStr(env, key, &jkey);
+ if (jthr)
+ return jthr;
+ jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
+ HADOOP_CONF, "getInt", JMETHOD2(JPARAM(JAVA_STRING), "I", "I"),
+ jkey, (jint)(*val));
+ destroyLocalReference(env, jkey);
+ if (jthr)
+ return jthr;
+ *val = jVal.i;
+ return NULL;
+}
+
+int hdfsConfGetInt(const char *key, int32_t *val)
+{
+ JNIEnv *env;
+ int ret;
+ jobject jConfiguration = NULL;
+ jthrowable jthr;
+
+ env = getJNIEnv();
+ if (env == NULL) {
+ ret = EINTERNAL;
+ goto done;
+ }
+ jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsConfGetInt(%s): new Configuration", key);
+ goto done;
+ }
+ jthr = hadoopConfGetInt(env, jConfiguration, key, val);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsConfGetInt(%s): hadoopConfGetInt", key);
+ goto done;
+ }
+ ret = 0;
+done:
+ destroyLocalReference(env, jConfiguration);
+ if (ret)
+ errno = ret;
+ return ret;
+}
+
+/**
+ * Calculate the effective URI to use, given a builder configuration.
+ *
+ * If there is not already a URI scheme, we prepend 'hdfs://'.
+ *
+ * If there is not already a port specified, and a port was given to the
+ * builder, we suffix that port. If there is a port specified but also one in
+ * the URI, that is an error.
+ *
+ * @param bld The hdfs builder object
+ * @param uri (out param) dynamically allocated string representing the
+ * effective URI
+ *
+ * @return 0 on success; error code otherwise
+ */
+static int calcEffectiveURI(struct hdfsBuilder *bld, char ** uri)
+{
+ const char *scheme;
+ char suffix[64];
+ const char *lastColon;
+ char *u;
+ size_t uriLen;
+
+ if (!bld->nn_jni)
+ return EINVAL;
+ scheme = (strstr(bld->nn_jni, "://")) ? "" : "hdfs://";
+ if (bld->port == 0) {
+ suffix[0] = '\0';
+ } else {
+ lastColon = rindex(bld->nn_jni, ':');
+ if (lastColon && (strspn(lastColon + 1, "0123456789") ==
+ strlen(lastColon + 1))) {
+ fprintf(stderr, "port %d was given, but URI '%s' already "
+ "contains a port!\n", bld->port, bld->nn_jni);
+ return EINVAL;
+ }
+ snprintf(suffix, sizeof(suffix), ":%d", bld->port);
+ }
+
+ uriLen = strlen(scheme) + strlen(bld->nn_jni) + strlen(suffix);
+ u = malloc((uriLen + 1) * (sizeof(char)));
+ if (!u) {
+ fprintf(stderr, "calcEffectiveURI: out of memory");
+ return ENOMEM;
+ }
+ snprintf(u, uriLen + 1, "%s%s%s", scheme, bld->nn_jni, suffix);
+ *uri = u;
+ return 0;
+}
+
+static const char *maybeNull(const char *str)
+{
+ return str ? str : "(NULL)";
+}
+
+const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
+ char *buf, size_t bufLen)
+{
+ snprintf(buf, bufLen, "forceNewInstance=%d, nn=%s, port=%d, "
+ "kerbTicketCachePath=%s, userName=%s, workingDir=%s\n",
+ bld->forceNewInstance, maybeNull(bld->nn), bld->port,
+ maybeNull(bld->kerbTicketCachePath),
+ maybeNull(bld->userName), maybeNull(bld->workingDir));
+ return buf;
+}
+
+/*
+ * The JNI version of builderConnect, return the reflection of FileSystem
+ */
+jobject hdfsBuilderConnect_JNI(JNIEnv *env, struct hdfsBuilder *bld)
+{
+ jobject jConfiguration = NULL, jFS = NULL, jURI = NULL, jCachePath = NULL;
+ jstring jURIString = NULL, jUserString = NULL;
+ jvalue jVal;
+ jthrowable jthr = NULL;
+ char *cURI = 0, buf[512];
+ int ret;
+ jobject jRet = NULL;
+
+ // jConfiguration = new Configuration();
+ jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect_JNI(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+
+ //Check what type of FileSystem the caller wants...
+ if (bld->nn_jni == NULL) {
+ // Get a local filesystem.
+ // Also handle the scenario where nn of hdfsBuilder is set to localhost.
+ if (bld->forceNewInstance) {
+ // fs = FileSytem#newInstanceLocal(conf);
+ jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
+ "newInstanceLocal", JMETHOD1(JPARAM(HADOOP_CONF),
+ JPARAM(HADOOP_LOCALFS)), jConfiguration);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect_JNI(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+ jFS = jVal.l;
+ } else {
+ // fs = FileSytem#getLocal(conf);
+ jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "getLocal",
+ JMETHOD1(JPARAM(HADOOP_CONF),
+ JPARAM(HADOOP_LOCALFS)),
+ jConfiguration);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect_JNI(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+ jFS = jVal.l;
+ }
+ } else {
+ if (!strcmp(bld->nn_jni, "default")) {
+ // jURI = FileSystem.getDefaultUri(conf)
+ jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
+ "getDefaultUri",
+ "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
+ jConfiguration);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect_JNI(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+ jURI = jVal.l;
+ } else {
+ // fs = FileSystem#get(URI, conf, ugi);
+ ret = calcEffectiveURI(bld, &cURI);
+ if (ret)
+ goto done;
+ jthr = newJavaStr(env, cURI, &jURIString);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect_JNI(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+ jthr = invokeMethod(env, &jVal, STATIC, NULL, JAVA_NET_URI,
+ "create", "(Ljava/lang/String;)Ljava/net/URI;",
+ jURIString);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect_JNI(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+ jURI = jVal.l;
+ }
+
+ if (bld->kerbTicketCachePath) {
+ jthr = hadoopConfSetStr(env, jConfiguration,
+ KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect_JNI(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+ }
+ jthr = newJavaStr(env, bld->userName, &jUserString);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect_JNI(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+ if (bld->forceNewInstance) {
+ jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
+ "newInstance", JMETHOD3(JPARAM(JAVA_NET_URI),
+ JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
+ JPARAM(HADOOP_FS)),
+ jURI, jConfiguration, jUserString);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect_JNI(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+ jFS = jVal.l;
+ } else {
+ jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "get",
+ JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF),
+ JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)),
+ jURI, jConfiguration, jUserString);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsBuilderConnect_JNI(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+ jFS = jVal.l;
+ }
+ }
+ jRet = (*env)->NewGlobalRef(env, jFS);
+ if (!jRet) {
+ ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+ "hdfsBuilderConnect_JNI(%s)",
+ hdfsBuilderToStr(bld, buf, sizeof(buf)));
+ goto done;
+ }
+ ret = 0;
+
+done:
+ // Release unnecessary local references
+ destroyLocalReference(env, jConfiguration);
+ destroyLocalReference(env, jFS);
+ destroyLocalReference(env, jURI);
+ destroyLocalReference(env, jCachePath);
+ destroyLocalReference(env, jURIString);
+ destroyLocalReference(env, jUserString);
+ free(cURI);
+
+ if (ret) {
+ errno = ret;
+ return NULL;
+ }
+ return jRet;
+}
+
+int hdfsDisconnect_JNI(jobject jFS)
+{
+ // JAVA EQUIVALENT:
+ // fs.close()
+
+ //Get the JNIEnv* corresponding to current thread
+ JNIEnv* env = getJNIEnv();
+ int ret;
+
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return -1;
+ }
+
+ //Sanity check
+ if (jFS == NULL) {
+ errno = EBADF;
+ return -1;
+ }
+
+ jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
+ "close", "()V");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsDisconnect: FileSystem#close");
+ } else {
+ ret = 0;
+ }
+ (*env)->DeleteGlobalRef(env, jFS);
+ if (ret) {
+ errno = ret;
+ return -1;
+ }
+ return 0;
+}
+
+static int hdfsCopyImpl(hdfsFS srcFS, const char* src, hdfsFS dstFS,
+ const char* dst, jboolean deleteSource)
+{
+ //JAVA EQUIVALENT
+ // FileUtil#copy(srcFS, srcPath, dstFS, dstPath,
+ // deleteSource = false, conf)
+
+ //Get the JNIEnv* corresponding to current thread
+ JNIEnv* env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return -1;
+ }
+
+ //In libwebhdfs, the hdfsFS derived from hdfsBuilderConnect series functions
+ //is actually a hdfsBuilder instance containing address information of NameNode.
+ //Thus here we need to use JNI to get the real java FileSystem objects.
+ jobject jSrcFS = hdfsBuilderConnect_JNI(env, (struct hdfsBuilder *) srcFS);
+ jobject jDstFS = hdfsBuilderConnect_JNI(env, (struct hdfsBuilder *) dstFS);
+
+ //Parameters
+ jobject jConfiguration = NULL, jSrcPath = NULL, jDstPath = NULL;
+ jthrowable jthr;
+ jvalue jVal;
+ int ret;
+
+ jthr = constructNewObjectOfPath(env, src, &jSrcPath);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsCopyImpl(src=%s): constructNewObjectOfPath", src);
+ goto done;
+ }
+ jthr = constructNewObjectOfPath(env, dst, &jDstPath);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsCopyImpl(dst=%s): constructNewObjectOfPath", dst);
+ goto done;
+ }
+
+ //Create the org.apache.hadoop.conf.Configuration object
+ jthr = constructNewObjectOfClass(env, &jConfiguration,
+ HADOOP_CONF, "()V");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsCopyImpl: Configuration constructor");
+ goto done;
+ }
+
+ //FileUtil#copy
+ jthr = invokeMethod(env, &jVal, STATIC,
+ NULL, "org/apache/hadoop/fs/FileUtil", "copy",
+ "(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
+ "Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
+ "ZLorg/apache/hadoop/conf/Configuration;)Z",
+ jSrcFS, jSrcPath, jDstFS, jDstPath, deleteSource,
+ jConfiguration);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsCopyImpl(src=%s, dst=%s, deleteSource=%d): "
+ "FileUtil#copy", src, dst, deleteSource);
+ goto done;
+ }
+ if (!jVal.z) {
+ ret = EIO;
+ goto done;
+ }
+ ret = 0;
+
+done:
+ destroyLocalReference(env, jConfiguration);
+ destroyLocalReference(env, jSrcPath);
+ destroyLocalReference(env, jDstPath);
+ //Disconnect src/dst FileSystem
+ hdfsDisconnect_JNI(jSrcFS);
+ hdfsDisconnect_JNI(jDstFS);
+
+ if (ret) {
+ errno = ret;
+ return -1;
+ }
+ return 0;
+}
+
+int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
+{
+ return hdfsCopyImpl(srcFS, src, dstFS, dst, 0);
+}
+
+int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
+{
+ return hdfsCopyImpl(srcFS, src, dstFS, dst, 1);
+}
+
+tOffset hdfsGetDefaultBlockSize(hdfsFS fs)
+{
+ // JAVA EQUIVALENT:
+ // fs.getDefaultBlockSize();
+
+ //Get the JNIEnv* corresponding to current thread
+ JNIEnv* env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return -1;
+ }
+
+ //In libwebhdfs, the hdfsFS derived from hdfsConnect functions
+ //is actually a hdfsBuilder instance containing address information of NameNode.
+ //Thus here we need to use JNI to get the real java FileSystem objects.
+ jobject jFS = hdfsBuilderConnect_JNI(env, (struct hdfsBuilder *) fs);
+
+ //FileSystem#getDefaultBlockSize()
+ jvalue jVal;
+ jthrowable jthr;
+ jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
+ "getDefaultBlockSize", "()J");
+ if (jthr) {
+ errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsGetDefaultBlockSize: FileSystem#getDefaultBlockSize");
+ //Disconnect
+ hdfsDisconnect_JNI(jFS);
+ return -1;
+ }
+
+ //Disconnect
+ hdfsDisconnect_JNI(jFS);
+ return jVal.j;
+}
+
+
+