You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2012/07/20 21:15:54 UTC
svn commit: r1363904 [1/2] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/
src/main/native/fuse-dfs/ src/main/native/libhdfs/ src/main/native/util/
src/main/resources/
Author: atm
Date: Fri Jul 20 19:15:52 2012
New Revision: 1363904
URL: http://svn.apache.org/viewvc?rev=1363904&view=rev
Log:
HDFS-3608. fuse_dfs: detect changes in UID ticket cache. Contributed by Colin Patrick McCabe.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/tree.h
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_context_handle.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_dfs.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_file_handle.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chmod.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chown.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_flush.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_getattr.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_mkdir.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_open.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_read.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_readdir.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_release.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rename.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rmdir.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_statfs.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_truncate.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_unlink.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_utimens.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_write.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jul 20 19:15:52 2012
@@ -520,6 +520,9 @@ Branch-2 ( Unreleased changes )
HDFS-3597. SNN fails to start after DFS upgrade. (Andy Isaacson via todd)
+ HDFS-3608. fuse_dfs: detect changes in UID ticket cache. (Colin Patrick
+ McCabe via atm)
+
BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt Fri Jul 20 19:15:52 2012
@@ -87,6 +87,7 @@ include_directories(
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_BINARY_DIR}
${JNI_INCLUDE_DIRS}
+ main/native
main/native/libhdfs
)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt Fri Jul 20 19:15:52 2012
@@ -69,5 +69,6 @@ IF(FUSE_FOUND)
${JAVA_JVM_LIBRARY}
hdfs
m
+ pthread
)
ENDIF(FUSE_FOUND)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c Fri Jul 20 19:15:52 2012
@@ -16,17 +16,38 @@
* limitations under the License.
*/
-#include "hdfs.h"
-#include "fuse_dfs.h"
#include "fuse_connect.h"
+#include "fuse_dfs.h"
#include "fuse_users.h"
+#include "libhdfs/hdfs.h"
+#include "util/tree.h"
+#include <inttypes.h>
#include <limits.h>
+#include <poll.h>
#include <search.h>
#include <stdio.h>
#include <stdlib.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <utime.h>
+
+#define FUSE_CONN_DEFAULT_TIMER_PERIOD 5
+#define FUSE_CONN_DEFAULT_EXPIRY_PERIOD (5 * 60)
+#define HADOOP_SECURITY_AUTHENTICATION "hadoop.security.authentication"
+#define HADOOP_FUSE_CONNECTION_TIMEOUT "hadoop.fuse.connection.timeout"
+#define HADOOP_FUSE_TIMER_PERIOD "hadoop.fuse.timer.period"
+
+/** Length of the buffer needed by asctime_r */
+#define TIME_STR_LEN 26
+
+struct hdfsConn;
+
+static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b);
+static void hdfsConnExpiry(void);
+static void* hdfsConnExpiryThread(void *v);
-#define HADOOP_SECURITY_AUTHENTICATION "hadoop.security.authentication"
+RB_HEAD(hdfsConnTree, hdfsConn);
enum authConf {
AUTH_CONF_UNKNOWN,
@@ -34,80 +55,308 @@ enum authConf {
AUTH_CONF_OTHER,
};
-#define MAX_ELEMENTS (16 * 1024)
-static struct hsearch_data *fsTable = NULL;
-static enum authConf hdfsAuthConf = AUTH_CONF_UNKNOWN;
-static pthread_mutex_t tableMutex = PTHREAD_MUTEX_INITIALIZER;
-
-/*
- * Allocate a hash table for fs handles. Returns 0 on success,
- * -1 on failure.
- */
-int allocFsTable(void) {
- assert(NULL == fsTable);
- fsTable = calloc(1, sizeof(struct hsearch_data));
- if (0 == hcreate_r(MAX_ELEMENTS, fsTable)) {
- ERROR("Unable to initialize connection table");
- return -1;
+struct hdfsConn {
+ RB_ENTRY(hdfsConn) entry;
+ /** How many threads are currently using this hdfsConnection object */
+ int64_t refcnt;
+ /** The username used to make this connection. Dynamically allocated. */
+ char *usrname;
+ /** Kerberos ticket cache path, or NULL if this is not a kerberized
+ * connection. Dynamically allocated. */
+ char *kpath;
+ /** mtime of the kpath, if the kpath is non-NULL */
+ time_t kPathMtime;
+ /** nanosecond component of the mtime of the kpath, if the kpath is non-NULL */
+ long kPathMtimeNs;
+ /** The cached libhdfs fs instance */
+ hdfsFS fs;
+ /** Nonzero if this hdfs connection needs to be closed as soon as possible.
+ * If this is true, the connection has been removed from the tree. */
+ int condemned;
+ /** Number of times we should run the expiration timer on this connection
+ * before removing it. */
+ int expirationCount;
+};
+
+RB_GENERATE(hdfsConnTree, hdfsConn, entry, hdfsConnCompare);
+
+/** Current cached libhdfs connections */
+static struct hdfsConnTree gConnTree;
+
+/** The URI used to make our connections. Dynamically allocated. */
+static char *gUri;
+
+/** The port used to make our connections, or 0. */
+static int gPort;
+
+/** Lock which protects gConnTree and gConnectTimer->active */
+static pthread_mutex_t gConnMutex;
+
+/** Type of authentication configured */
+static enum authConf gHdfsAuthConf;
+
+/** FUSE connection timer expiration period */
+static int32_t gTimerPeriod;
+
+/** FUSE connection expiry period */
+static int32_t gExpiryPeriod;
+
+/** FUSE timer expiration thread */
+static pthread_t gTimerThread;
+
+/**
+ * Find out what type of authentication the system administrator
+ * has configured.
+ *
+ * @return the type of authentication, or AUTH_CONF_UNKNOWN on error.
+ */
+static enum authConf discoverAuthConf(void)
+{
+ int ret;
+ char *val = NULL;
+ enum authConf authConf;
+
+ ret = hdfsConfGetStr(HADOOP_SECURITY_AUTHENTICATION, &val);
+ if (ret)
+ authConf = AUTH_CONF_UNKNOWN;
+ else if (!val)
+ authConf = AUTH_CONF_OTHER;
+ else if (!strcmp(val, "kerberos"))
+ authConf = AUTH_CONF_KERBEROS;
+ else
+ authConf = AUTH_CONF_OTHER;
+ free(val);
+ return authConf;
+}
+
+int fuseConnectInit(const char *nnUri, int port)
+{
+ const char *timerPeriod;
+ int ret;
+
+ gTimerPeriod = FUSE_CONN_DEFAULT_TIMER_PERIOD;
+ ret = hdfsConfGetInt(HADOOP_FUSE_CONNECTION_TIMEOUT, &gTimerPeriod);
+ if (ret) {
+ fprintf(stderr, "Unable to determine the configured value for %s.",
+ HADOOP_FUSE_TIMER_PERIOD);
+ return -EINVAL;
+ }
+ if (gTimerPeriod < 1) {
+ fprintf(stderr, "Invalid value %d given for %s.\n",
+ gTimerPeriod, HADOOP_FUSE_TIMER_PERIOD);
+ return -EINVAL;
+ }
+ gExpiryPeriod = FUSE_CONN_DEFAULT_EXPIRY_PERIOD;
+ ret = hdfsConfGetInt(HADOOP_FUSE_CONNECTION_TIMEOUT, &gExpiryPeriod);
+ if (ret) {
+ fprintf(stderr, "Unable to determine the configured value for %s.",
+ HADOOP_FUSE_CONNECTION_TIMEOUT);
+ return -EINVAL;
}
+ if (gExpiryPeriod < 1) {
+ fprintf(stderr, "Invalid value %d given for %s.\n",
+ gExpiryPeriod, HADOOP_FUSE_CONNECTION_TIMEOUT);
+ return -EINVAL;
+ }
+ gHdfsAuthConf = discoverAuthConf();
+ if (gHdfsAuthConf == AUTH_CONF_UNKNOWN) {
+ fprintf(stderr, "Unable to determine the configured value for %s.",
+ HADOOP_SECURITY_AUTHENTICATION);
+ return -EINVAL;
+ }
+ gPort = port;
+ gUri = strdup(nnUri);
+ if (!gUri) {
+ fprintf(stderr, "fuseConnectInit: OOM allocting nnUri\n");
+ return -ENOMEM;
+ }
+ ret = pthread_mutex_init(&gConnMutex, NULL);
+ if (ret) {
+ free(gUri);
+ fprintf(stderr, "fuseConnectInit: pthread_mutex_init failed with error %d\n",
+ ret);
+ return -ret;
+ }
+ RB_INIT(&gConnTree);
+ ret = pthread_create(&gTimerThread, NULL, hdfsConnExpiryThread, NULL);
+ if (ret) {
+ free(gUri);
+ pthread_mutex_destroy(&gConnMutex);
+ fprintf(stderr, "fuseConnectInit: pthread_create failed with error %d\n",
+ ret);
+ return -ret;
+ }
+ fprintf(stderr, "fuseConnectInit: initialized with timer period %d, "
+ "expiry period %d\n", gTimerPeriod, gExpiryPeriod);
return 0;
}
-/*
- * Find a fs handle for the given key. Returns a fs handle,
- * or NULL if there is no fs for the given key.
- */
-static hdfsFS findFs(char *key) {
- ENTRY entry;
- ENTRY *entryP = NULL;
- entry.key = key;
- if (0 == hsearch_r(entry, FIND, &entryP, fsTable)) {
- return NULL;
- }
- assert(NULL != entryP->data);
- return (hdfsFS)entryP->data;
-}
-
-/*
- * Insert the given fs handle into the table.
- * Returns 0 on success, -1 on failure.
- */
-static int insertFs(char *key, hdfsFS fs) {
- ENTRY entry;
- ENTRY *entryP = NULL;
- assert(NULL != fs);
- entry.key = strdup(key);
- if (entry.key == NULL) {
- return -1;
- }
- entry.data = (void*)fs;
- if (0 == hsearch_r(entry, ENTER, &entryP, fsTable)) {
- return -1;
+/**
+ * Compare two libhdfs connections by username
+ *
+ * @param a The first libhdfs connection
+ * @param b The second libhdfs connection
+ *
+ * @return -1, 0, or 1 depending on a < b, a ==b, a > b
+ */
+static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b)
+{
+ return strcmp(a->usrname, b->usrname);
+}
+
+/**
+ * Find a libhdfs connection by username
+ *
+ * @param usrname The username to look up
+ *
+ * @return The connection, or NULL if none could be found
+ */
+static struct hdfsConn* hdfsConnFind(const char *usrname)
+{
+ struct hdfsConn exemplar;
+
+ memset(&exemplar, 0, sizeof(exemplar));
+ exemplar.usrname = (char*)usrname;
+ return RB_FIND(hdfsConnTree, &gConnTree, &exemplar);
+}
+
+/**
+ * Free the resource associated with a libhdfs connection.
+ *
+ * You must remove the connection from the tree before calling this function.
+ *
+ * @param conn The libhdfs connection
+ */
+static void hdfsConnFree(struct hdfsConn *conn)
+{
+ int ret;
+
+ ret = hdfsDisconnect(conn->fs);
+ if (ret) {
+ fprintf(stderr, "hdfsConnFree(username=%s): "
+ "hdfsDisconnect failed with error %d\n",
+ (conn->usrname ? conn->usrname : "(null)"), ret);
+ }
+ free(conn->usrname);
+ free(conn->kpath);
+ free(conn);
+}
+
+/**
+ * Convert a time_t to a string.
+ *
+ * @param sec time in seconds since the epoch
+ * @param buf (out param) output buffer
+ * @param bufLen length of output buffer
+ *
+ * @return 0 on success; ENAMETOOLONG if the provided buffer was
+ * too short
+ */
+static int timeToStr(time_t sec, char *buf, size_t bufLen)
+{
+ struct tm tm, *out;
+ size_t l;
+
+ if (bufLen < TIME_STR_LEN) {
+ return -ENAMETOOLONG;
}
+ out = localtime_r(&sec, &tm);
+ asctime_r(out, buf);
+ // strip trailing newline
+ l = strlen(buf);
+ if (l != 0)
+ buf[l - 1] = '\0';
return 0;
}
/**
- * Find out what type of authentication the system administrator
- * has configured.
+ * Check an HDFS connection's Kerberos path.
*
- * @return the type of authentication, or AUTH_CONF_UNKNOWN on error.
+ * If the mtime of the Kerberos ticket cache file has changed since we first
+ * opened the connection, mark the connection as condemned and remove it from
+ * the hdfs connection tree.
+ *
+ * @param conn The HDFS connection
*/
-static enum authConf discoverAuthConf(void)
+static int hdfsConnCheckKpath(const struct hdfsConn *conn)
{
- int ret;
- char *val = NULL;
- enum authConf authConf;
-
- ret = hdfsConfGet(HADOOP_SECURITY_AUTHENTICATION, &val);
- if (ret)
- authConf = AUTH_CONF_UNKNOWN;
- else if (!strcmp(val, "kerberos"))
- authConf = AUTH_CONF_KERBEROS;
- else
- authConf = AUTH_CONF_OTHER;
- free(val);
- return authConf;
+ int ret;
+ struct stat st;
+ char prevTimeBuf[TIME_STR_LEN], newTimeBuf[TIME_STR_LEN];
+
+ if (stat(conn->kpath, &st) < 0) {
+ ret = errno;
+ if (ret == ENOENT) {
+ fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): the kerberos "
+ "ticket cache file '%s' has disappeared. Condemning the "
+ "connection.\n", conn->usrname, conn->kpath);
+ } else {
+ fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): stat(%s) "
+ "failed with error code %d. Pessimistically condemning the "
+ "connection.\n", conn->usrname, conn->kpath, ret);
+ }
+ return -ret;
+ }
+ if ((st.st_mtim.tv_sec != conn->kPathMtime) ||
+ (st.st_mtim.tv_nsec != conn->kPathMtimeNs)) {
+ timeToStr(conn->kPathMtime, prevTimeBuf, sizeof(prevTimeBuf));
+ timeToStr(st.st_mtim.tv_sec, newTimeBuf, sizeof(newTimeBuf));
+ fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): mtime on '%s' "
+ "has changed from '%s' to '%s'. Condemning the connection "
+ "because our cached Kerberos credentials have probably "
+ "changed.\n", conn->usrname, conn->kpath, prevTimeBuf, newTimeBuf);
+ return -EINTERNAL;
+ }
+ return 0;
+}
+
+/**
+ * Cache expiration logic.
+ *
+ * This function is called periodically by the cache expiration thread. For
+ * each FUSE connection not currently in use (refcnt == 0) it will decrement the
+ * expirationCount for that connection. Once the expirationCount reaches 0 for
+ * a connection, it can be garbage collected.
+ *
+ * We also check to see if the Kerberos credentials have changed. If so, the
+ * connecton is immediately condemned, even if it is currently in use.
+ */
+static void hdfsConnExpiry(void)
+{
+ struct hdfsConn *conn, *tmpConn;
+
+ pthread_mutex_lock(&gConnMutex);
+ RB_FOREACH_SAFE(conn, hdfsConnTree, &gConnTree, tmpConn) {
+ if (conn->kpath) {
+ if (hdfsConnCheckKpath(conn)) {
+ conn->condemned = 1;
+ RB_REMOVE(hdfsConnTree, &gConnTree, conn);
+ if (conn->refcnt == 0) {
+ /* If the connection is not in use by any threads, delete it
+ * immediately. If it is still in use by some threads, the last
+ * thread using it will clean it up later inside hdfsConnRelease. */
+ hdfsConnFree(conn);
+ continue;
+ }
+ }
+ }
+ if (conn->refcnt == 0) {
+ /* If the connection is not currently in use by a thread, check to see if
+ * it ought to be removed because it's too old. */
+ conn->expirationCount--;
+ if (conn->expirationCount <= 0) {
+ if (conn->condemned) {
+ fprintf(stderr, "hdfsConnExpiry: LOGIC ERROR: condemned connection "
+ "as %s is still in the tree!\n", conn->usrname);
+ }
+ fprintf(stderr, "hdfsConnExpiry: freeing and removing connection as "
+ "%s because it's now too old.\n", conn->usrname);
+ RB_REMOVE(hdfsConnTree, &gConnTree, conn);
+ hdfsConnFree(conn);
+ }
+ }
+ }
+ pthread_mutex_unlock(&gConnMutex);
}
/**
@@ -129,9 +378,9 @@ static enum authConf discoverAuthConf(vo
* @param path (out param) the path to the ticket cache file
* @param pathLen length of the path buffer
*/
-static void findKerbTicketCachePath(char *path, size_t pathLen)
+static void findKerbTicketCachePath(struct fuse_context *ctx,
+ char *path, size_t pathLen)
{
- struct fuse_context *ctx = fuse_get_context();
FILE *fp = NULL;
static const char * const KRB5CCNAME = "\0KRB5CCNAME=";
int c = '\0', pathIdx = 0, keyIdx = 0;
@@ -168,72 +417,213 @@ done:
}
}
-/*
- * Connect to the NN as the current user/group.
- * Returns a fs handle on success, or NULL on failure.
- */
-hdfsFS doConnectAsUser(const char *nn_uri, int nn_port) {
- struct hdfsBuilder *bld;
- uid_t uid = fuse_get_context()->uid;
- char *user = getUsername(uid);
- char kpath[PATH_MAX];
+/**
+ * Create a new libhdfs connection.
+ *
+ * @param usrname Username to use for the new connection
+ * @param ctx FUSE context to use for the new connection
+ * @param out (out param) the new libhdfs connection
+ *
+ * @return 0 on success; error code otherwise
+ */
+static int fuseNewConnect(const char *usrname, struct fuse_context *ctx,
+ struct hdfsConn **out)
+{
+ struct hdfsBuilder *bld = NULL;
+ char kpath[PATH_MAX] = { 0 };
+ struct hdfsConn *conn = NULL;
int ret;
- hdfsFS fs = NULL;
- if (NULL == user) {
- goto done;
- }
-
- ret = pthread_mutex_lock(&tableMutex);
- assert(0 == ret);
+ struct stat st;
- fs = findFs(user);
- if (NULL == fs) {
- if (hdfsAuthConf == AUTH_CONF_UNKNOWN) {
- hdfsAuthConf = discoverAuthConf();
- if (hdfsAuthConf == AUTH_CONF_UNKNOWN) {
- ERROR("Unable to determine the configured value for %s.",
- HADOOP_SECURITY_AUTHENTICATION);
- goto done;
- }
- }
- bld = hdfsNewBuilder();
- if (!bld) {
- ERROR("Unable to create hdfs builder");
- goto done;
+ conn = calloc(1, sizeof(struct hdfsConn));
+ if (!conn) {
+ fprintf(stderr, "fuseNewConnect: OOM allocating struct hdfsConn\n");
+ ret = -ENOMEM;
+ goto error;
+ }
+ bld = hdfsNewBuilder();
+ if (!bld) {
+ fprintf(stderr, "Unable to create hdfs builder\n");
+ ret = -ENOMEM;
+ goto error;
+ }
+ /* We always want to get a new FileSystem instance here-- that's why we call
+ * hdfsBuilderSetForceNewInstance. Otherwise the 'cache condemnation' logic
+ * in hdfsConnExpiry will not work correctly, since FileSystem might re-use the
+ * existing cached connection which we wanted to get rid of.
+ */
+ hdfsBuilderSetForceNewInstance(bld);
+ hdfsBuilderSetNameNode(bld, gUri);
+ if (gPort) {
+ hdfsBuilderSetNameNodePort(bld, gPort);
+ }
+ hdfsBuilderSetUserName(bld, usrname);
+ if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
+ findKerbTicketCachePath(ctx, kpath, sizeof(kpath));
+ if (stat(kpath, &st) < 0) {
+ fprintf(stderr, "fuseNewConnect: failed to find Kerberos ticket cache "
+ "file '%s'. Did you remember to kinit for UID %d?\n",
+ kpath, ctx->uid);
+ ret = -EACCES;
+ goto error;
}
- hdfsBuilderSetForceNewInstance(bld);
- hdfsBuilderSetNameNode(bld, nn_uri);
- if (nn_port) {
- hdfsBuilderSetNameNodePort(bld, nn_port);
- }
- hdfsBuilderSetUserName(bld, user);
- if (hdfsAuthConf == AUTH_CONF_KERBEROS) {
- findKerbTicketCachePath(kpath, sizeof(kpath));
- hdfsBuilderSetKerbTicketCachePath(bld, kpath);
- }
- fs = hdfsBuilderConnect(bld);
- if (NULL == fs) {
- int err = errno;
- ERROR("Unable to create fs for user %s: error code %d", user, err);
- goto done;
+ conn->kPathMtime = st.st_mtim.tv_sec;
+ conn->kPathMtimeNs = st.st_mtim.tv_nsec;
+ hdfsBuilderSetKerbTicketCachePath(bld, kpath);
+ conn->kpath = strdup(kpath);
+ if (!conn->kpath) {
+ fprintf(stderr, "fuseNewConnect: OOM allocating kpath\n");
+ ret = -ENOMEM;
+ goto error;
}
- if (-1 == insertFs(user, fs)) {
- ERROR("Unable to cache fs for user %s", user);
+ }
+ conn->usrname = strdup(usrname);
+ if (!conn->usrname) {
+ fprintf(stderr, "fuseNewConnect: OOM allocating usrname\n");
+ ret = -ENOMEM;
+ goto error;
+ }
+ conn->fs = hdfsBuilderConnect(bld);
+ bld = NULL;
+ if (!conn->fs) {
+ ret = errno;
+ fprintf(stderr, "fuseNewConnect(usrname=%s): Unable to create fs: "
+ "error code %d\n", usrname, ret);
+ goto error;
+ }
+ RB_INSERT(hdfsConnTree, &gConnTree, conn);
+ *out = conn;
+ return 0;
+
+error:
+ if (bld) {
+ hdfsFreeBuilder(bld);
+ }
+ if (conn) {
+ free(conn->kpath);
+ free(conn->usrname);
+ free(conn);
+ }
+ return ret;
+}
+
+int fuseConnect(const char *usrname, struct fuse_context *ctx,
+ struct hdfsConn **out)
+{
+ int ret;
+ struct hdfsConn* conn;
+
+ pthread_mutex_lock(&gConnMutex);
+ conn = hdfsConnFind(usrname);
+ if (!conn) {
+ ret = fuseNewConnect(usrname, ctx, &conn);
+ if (ret) {
+ pthread_mutex_unlock(&gConnMutex);
+ fprintf(stderr, "fuseConnect(usrname=%s): fuseNewConnect failed with "
+ "error code %d\n", usrname, ret);
+ return ret;
}
}
+ conn->refcnt++;
+ conn->expirationCount = (gExpiryPeriod + gTimerPeriod - 1) / gTimerPeriod;
+ if (conn->expirationCount < 2)
+ conn->expirationCount = 2;
+ pthread_mutex_unlock(&gConnMutex);
+ *out = conn;
+ return 0;
+}
-done:
- ret = pthread_mutex_unlock(&tableMutex);
- assert(0 == ret);
- free(user);
- return fs;
+int fuseConnectAsThreadUid(struct hdfsConn **conn)
+{
+ struct fuse_context *ctx;
+ char *usrname;
+ int ret;
+
+ ctx = fuse_get_context();
+ usrname = getUsername(ctx->uid);
+ ret = fuseConnect(usrname, ctx, conn);
+ free(usrname);
+ return ret;
}
-/*
- * We currently cache a fs handle per-user in this module rather
- * than use the FileSystem cache in the java client. Therefore
- * we do not disconnect the fs handle here.
- */
-int doDisconnect(hdfsFS fs) {
+int fuseConnectTest(void)
+{
+ int ret;
+ struct hdfsConn *conn;
+
+ if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
+ // TODO: call some method which can tell us whether the FS exists. In order
+ // to implement this, we have to add a method to FileSystem in order to do
+ // this without valid Kerberos authentication. See HDFS-3674 for details.
+ return 0;
+ }
+ ret = fuseNewConnect("root", NULL, &conn);
+ if (ret) {
+ fprintf(stderr, "fuseConnectTest failed with error code %d\n", ret);
+ return ret;
+ }
+ hdfsConnRelease(conn);
return 0;
}
+
+struct hdfs_internal* hdfsConnGetFs(struct hdfsConn *conn)
+{
+ return conn->fs;
+}
+
+void hdfsConnRelease(struct hdfsConn *conn)
+{
+ pthread_mutex_lock(&gConnMutex);
+ conn->refcnt--;
+ if ((conn->refcnt == 0) && (conn->condemned)) {
+ fprintf(stderr, "hdfsConnRelease(usrname=%s): freeing condemend FS!\n",
+ conn->usrname);
+ /* Notice that we're not removing the connection from gConnTree here.
+ * If the connection is condemned, it must have already been removed from
+ * the tree, so that no other threads start using it.
+ */
+ hdfsConnFree(conn);
+ }
+ pthread_mutex_unlock(&gConnMutex);
+}
+
+/**
+ * Get the monotonic time.
+ *
+ * Unlike the wall-clock time, monotonic time only ever goes forward. If the
+ * user adjusts the time, the monotonic time will not be affected.
+ *
+ * @return The monotonic time
+ */
+static time_t getMonotonicTime(void)
+{
+ int res;
+ struct timespec ts;
+
+ res = clock_gettime(CLOCK_MONOTONIC, &ts);
+ if (res)
+ abort();
+ return ts.tv_sec;
+}
+
+/**
+ * FUSE connection expiration thread
+ *
+ */
+static void* hdfsConnExpiryThread(void *v)
+{
+ time_t nextTime, curTime;
+ int waitTime;
+
+ nextTime = getMonotonicTime() + gTimerPeriod;
+ while (1) {
+ curTime = getMonotonicTime();
+ if (curTime >= nextTime) {
+ hdfsConnExpiry();
+ nextTime = curTime + gTimerPeriod;
+ }
+ waitTime = (nextTime - curTime) * 1000;
+ poll(NULL, 0, waitTime);
+ }
+ return NULL;
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.h?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.h (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.h Fri Jul 20 19:15:52 2012
@@ -19,10 +19,72 @@
#ifndef __FUSE_CONNECT_H__
#define __FUSE_CONNECT_H__
-#include "fuse_dfs.h"
+struct fuse_context;
+struct hdfsConn;
+struct hdfs_internal;
-hdfsFS doConnectAsUser(const char *nn_uri, int nn_port);
-int doDisconnect(hdfsFS fs);
-int allocFsTable(void);
+/**
+ * Initialize the fuse connection subsystem.
+ *
+ * This must be called before any of the other functions in this module.
+ *
+ * @param nnUri The NameNode URI
+ * @param port The NameNode port
+ *
+ * @return 0 on success; error code otherwise
+ */
+int fuseConnectInit(const char *nnUri, int port);
+
+/**
+ * Get a libhdfs connection.
+ *
+ * If there is an existing connection, it will be reused. If not, a new one
+ * will be created.
+ *
+ * You must call hdfsConnRelease on the connection you get back!
+ *
+ * @param usrname The username to use
+ * @param ctx The FUSE context to use (contains UID, PID of requestor)
+ * @param conn (out param) The HDFS connection
+ *
+ * @return 0 on success; error code otherwise
+ */
+int fuseConnect(const char *usrname, struct fuse_context *ctx,
+ struct hdfsConn **out);
+
+/**
+ * Get a libhdfs connection.
+ *
+ * The same as fuseConnect, except the username will be determined from the FUSE
+ * thread context.
+ *
+ * @param conn (out param) The HDFS connection
+ *
+ * @return 0 on success; error code otherwise
+ */
+int fuseConnectAsThreadUid(struct hdfsConn **conn);
+
+/**
+ * Test whether we can connect to the HDFS cluster
+ *
+ * @return 0 on success; error code otherwise
+ */
+int fuseConnectTest(void);
+
+/**
+ * Get the hdfsFS associated with an hdfsConn.
+ *
+ * @param conn The hdfsConn
+ *
+ * @return the hdfsFS
+ */
+struct hdfs_internal* hdfsConnGetFs(struct hdfsConn *conn);
+
+/**
+ * Release an hdfsConn when we're done with it.
+ *
+ * @param conn The hdfsConn
+ */
+void hdfsConnRelease(struct hdfsConn *conn);
#endif
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_context_handle.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_context_handle.h?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_context_handle.h (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_context_handle.h Fri Jul 20 19:15:52 2012
@@ -31,8 +31,6 @@
//
typedef struct dfs_context_struct {
int debug;
- char *nn_uri;
- int nn_port;
int read_only;
int usetrash;
int direct_io;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_dfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_dfs.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_dfs.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_dfs.c Fri Jul 20 19:15:52 2012
@@ -65,8 +65,19 @@ static struct fuse_operations dfs_oper =
.truncate = dfs_truncate,
};
+static void print_env_vars(void)
+{
+ const char *cp = getenv("CLASSPATH");
+ const char *ld = getenv("LD_LIBRARY_PATH");
+
+ fprintf(stderr, "LD_LIBRARY_PATH=%s",ld == NULL ? "NULL" : ld);
+ fprintf(stderr, "CLASSPATH=%s",cp == NULL ? "NULL" : cp);
+}
+
int main(int argc, char *argv[])
{
+ int ret;
+
umask(0);
extern const char *program;
@@ -106,24 +117,22 @@ int main(int argc, char *argv[])
exit(0);
}
- // Check connection as root
+ ret = fuseConnectInit(options.nn_uri, options.nn_port);
+ if (ret) {
+ ERROR("FATAL: dfs_init: fuseConnInit failed with error %d!", ret);
+ print_env_vars();
+ exit(EXIT_FAILURE);
+ }
if (options.initchecks == 1) {
- hdfsFS tempFS = hdfsConnectAsUser(options.nn_uri, options.nn_port, "root");
- if (NULL == tempFS) {
- const char *cp = getenv("CLASSPATH");
- const char *ld = getenv("LD_LIBRARY_PATH");
- ERROR("FATAL: misconfiguration - cannot connect to HDFS");
- ERROR("LD_LIBRARY_PATH=%s",ld == NULL ? "NULL" : ld);
- ERROR("CLASSPATH=%s",cp == NULL ? "NULL" : cp);
- exit(1);
- }
- if (doDisconnect(tempFS)) {
- ERROR("FATAL: unable to disconnect from test filesystem.");
- exit(1);
+ ret = fuseConnectTest();
+ if (ret) {
+ ERROR("FATAL: dfs_init: fuseConnTest failed with error %d!", ret);
+ print_env_vars();
+ exit(EXIT_FAILURE);
}
}
- int ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL);
+ ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL);
fuse_opt_free_args(&args);
return ret;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_file_handle.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_file_handle.h?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_file_handle.h (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_file_handle.h Fri Jul 20 19:15:52 2012
@@ -22,6 +22,8 @@
#include <hdfs.h>
#include <pthread.h>
+struct hdfsConn;
+
/**
*
* dfs_fh_struct is passed around for open files. Fuse provides a hook (the context)
@@ -34,10 +36,10 @@
*/
typedef struct dfs_fh_struct {
hdfsFile hdfsFH;
+ struct hdfsConn *conn;
char *buf;
tSize bufferSize; //what is the size of the buffer we have
off_t buffersStartOffset; //where the buffer starts in the file
- hdfsFS fs; // for reads/writes need to access as the real user
pthread_mutex_t mutex;
} dfs_fh;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chmod.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chmod.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chmod.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chmod.c Fri Jul 20 19:15:52 2012
@@ -23,6 +23,8 @@
int dfs_chmod(const char *path, mode_t mode)
{
+ struct hdfsConn *conn = NULL;
+ hdfsFS fs;
TRACE1("chmod", path)
int ret = 0;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
@@ -31,22 +33,24 @@ int dfs_chmod(const char *path, mode_t m
assert(dfs);
assert('/' == *path);
- hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
- if (userFS == NULL) {
- ERROR("Could not connect to HDFS");
+ ret = fuseConnectAsThreadUid(&conn);
+ if (ret) {
+ fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
+ "connection! error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
+ fs = hdfsConnGetFs(conn);
- if (hdfsChmod(userFS, path, (short)mode)) {
+ if (hdfsChmod(fs, path, (short)mode)) {
ERROR("Could not chmod %s to %d", path, (int)mode);
ret = (errno > 0) ? -errno : -EIO;
goto cleanup;
}
cleanup:
- if (doDisconnect(userFS)) {
- ret = -EIO;
+ if (conn) {
+ hdfsConnRelease(conn);
}
return ret;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chown.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chown.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chown.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chown.c Fri Jul 20 19:15:52 2012
@@ -25,12 +25,12 @@
int dfs_chown(const char *path, uid_t uid, gid_t gid)
{
- TRACE1("chown", path)
-
+ struct hdfsConn *conn = NULL;
int ret = 0;
char *user = NULL;
char *group = NULL;
- hdfsFS userFS = NULL;
+
+ TRACE1("chown", path)
// retrieve dfs specific data
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
@@ -61,14 +61,15 @@ int dfs_chown(const char *path, uid_t ui
}
}
- userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
- if (userFS == NULL) {
- ERROR("Could not connect to HDFS");
+ ret = fuseConnect(user, fuse_get_context(), &conn);
+ if (ret) {
+ fprintf(stderr, "fuseConnect: failed to open a libhdfs connection! "
+ "error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
- if (hdfsChown(userFS, path, user, group)) {
+ if (hdfsChown(hdfsConnGetFs(conn), path, user, group)) {
ret = errno;
ERROR("Could not chown %s to %d:%d: error %d", path, (int)uid, gid, ret);
ret = (ret > 0) ? -ret : -EIO;
@@ -76,16 +77,11 @@ int dfs_chown(const char *path, uid_t ui
}
cleanup:
- if (userFS && doDisconnect(userFS)) {
- ret = -EIO;
- }
- if (user) {
- free(user);
- }
- if (group) {
- free(group);
+ if (conn) {
+ hdfsConnRelease(conn);
}
+ free(user);
+ free(group);
return ret;
-
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_flush.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_flush.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_flush.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_flush.c Fri Jul 20 19:15:52 2012
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+#include "fuse_connect.h"
#include "fuse_dfs.h"
#include "fuse_impls.h"
#include "fuse_file_handle.h"
@@ -43,9 +44,7 @@ int dfs_flush(const char *path, struct f
assert(fh);
hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
assert(file_handle);
-
- assert(fh->fs);
- if (hdfsFlush(fh->fs, file_handle) != 0) {
+ if (hdfsFlush(hdfsConnGetFs(fh->conn), file_handle) != 0) {
ERROR("Could not flush %lx for %s\n",(long)file_handle, path);
return -EIO;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_getattr.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_getattr.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_getattr.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_getattr.c Fri Jul 20 19:15:52 2012
@@ -23,22 +23,27 @@
int dfs_getattr(const char *path, struct stat *st)
{
- TRACE1("getattr", path)
+ struct hdfsConn *conn = NULL;
+ hdfsFS fs;
+ int ret;
+ hdfsFileInfo *info;
+ TRACE1("getattr", path)
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
-
assert(dfs);
assert(path);
assert(st);
- hdfsFS fs = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
- if (NULL == fs) {
- ERROR("Could not connect to %s:%d", dfs->nn_uri, dfs->nn_port);
- return -EIO;
+ ret = fuseConnectAsThreadUid(&conn);
+ if (ret) {
+ fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
+ "connection! error %d.\n", ret);
+ ret = -EIO;
+ goto cleanup;
}
-
- int ret = 0;
- hdfsFileInfo *info = hdfsGetPathInfo(fs,path);
+ fs = hdfsConnGetFs(conn);
+
+ info = hdfsGetPathInfo(fs,path);
if (NULL == info) {
ret = -ENOENT;
goto cleanup;
@@ -63,9 +68,8 @@ int dfs_getattr(const char *path, struct
hdfsFreeFileInfo(info,1);
cleanup:
- if (doDisconnect(fs)) {
- ERROR("Could not disconnect from filesystem");
- ret = -EIO;
+ if (conn) {
+ hdfsConnRelease(conn);
}
return ret;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_mkdir.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_mkdir.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_mkdir.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_mkdir.c Fri Jul 20 19:15:52 2012
@@ -23,9 +23,12 @@
int dfs_mkdir(const char *path, mode_t mode)
{
- TRACE1("mkdir", path)
-
+ struct hdfsConn *conn = NULL;
+ hdfsFS fs;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+ int ret;
+
+ TRACE1("mkdir", path)
assert(path);
assert(dfs);
@@ -41,29 +44,32 @@ int dfs_mkdir(const char *path, mode_t m
return -EACCES;
}
- hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
- if (userFS == NULL) {
- ERROR("Could not connect");
- return -EIO;
+ ret = fuseConnectAsThreadUid(&conn);
+ if (ret) {
+ fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
+ "connection! error %d.\n", ret);
+ ret = -EIO;
+ goto cleanup;
}
+ fs = hdfsConnGetFs(conn);
// In theory the create and chmod should be atomic.
- int ret = 0;
- if (hdfsCreateDirectory(userFS, path)) {
+ if (hdfsCreateDirectory(fs, path)) {
ERROR("HDFS could not create directory %s", path);
ret = (errno > 0) ? -errno : -EIO;
goto cleanup;
}
- if (hdfsChmod(userFS, path, (short)mode)) {
+ if (hdfsChmod(fs, path, (short)mode)) {
ERROR("Could not chmod %s to %d", path, (int)mode);
ret = (errno > 0) ? -errno : -EIO;
}
+ ret = 0;
cleanup:
- if (doDisconnect(userFS)) {
- ret = -EIO;
+ if (conn) {
+ hdfsConnRelease(conn);
}
return ret;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_open.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_open.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_open.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_open.c Fri Jul 20 19:15:52 2012
@@ -21,38 +21,45 @@
#include "fuse_connect.h"
#include "fuse_file_handle.h"
+#include <stdio.h>
+#include <stdlib.h>
+
int dfs_open(const char *path, struct fuse_file_info *fi)
{
- TRACE1("open", path)
-
+ hdfsFS fs = NULL;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+ dfs_fh *fh = NULL;
+ int mutexInit = 0, ret;
+
+ TRACE1("open", path)
// check params and the context var
assert(path);
assert('/' == *path);
assert(dfs);
- int ret = 0;
-
// 0x8000 is always passed in and hadoop doesn't like it, so killing it here
// bugbug figure out what this flag is and report problem to Hadoop JIRA
int flags = (fi->flags & 0x7FFF);
// retrieve dfs specific data
- dfs_fh *fh = (dfs_fh*)calloc(1, sizeof (dfs_fh));
- if (fh == NULL) {
+ fh = (dfs_fh*)calloc(1, sizeof (dfs_fh));
+ if (!fh) {
ERROR("Malloc of new file handle failed");
- return -EIO;
+ ret = -EIO;
+ goto error;
}
-
- fh->fs = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
- if (fh->fs == NULL) {
- ERROR("Could not connect to dfs");
- return -EIO;
+ ret = fuseConnectAsThreadUid(&fh->conn);
+ if (ret) {
+ fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
+ "connection! error %d.\n", ret);
+ ret = -EIO;
+ goto error;
}
+ fs = hdfsConnGetFs(fh->conn);
if (flags & O_RDWR) {
- hdfsFileInfo *info = hdfsGetPathInfo(fh->fs,path);
+ hdfsFileInfo *info = hdfsGetPathInfo(fs, path);
if (info == NULL) {
// File does not exist (maybe?); interpret it as a O_WRONLY
// If the actual error was something else, we'll get it again when
@@ -66,15 +73,23 @@ int dfs_open(const char *path, struct fu
}
}
- if ((fh->hdfsFH = hdfsOpenFile(fh->fs, path, flags, 0, 0, 0)) == NULL) {
+ if ((fh->hdfsFH = hdfsOpenFile(fs, path, flags, 0, 0, 0)) == NULL) {
ERROR("Could not open file %s (errno=%d)", path, errno);
if (errno == 0 || errno == EINTERNAL) {
- return -EIO;
+ ret = -EIO;
+ goto error;
}
- return -errno;
+ ret = -errno;
+ goto error;
}
- pthread_mutex_init(&fh->mutex, NULL);
+ ret = pthread_mutex_init(&fh->mutex, NULL);
+ if (ret) {
+ fprintf(stderr, "dfs_open: error initializing mutex: error %d\n", ret);
+ ret = -EIO;
+ goto error;
+ }
+ mutexInit = 1;
if (fi->flags & O_WRONLY || fi->flags & O_CREAT) {
fh->buf = NULL;
@@ -84,11 +99,27 @@ int dfs_open(const char *path, struct fu
if (NULL == fh->buf) {
ERROR("Could not allocate memory for a read for file %s\n", path);
ret = -EIO;
+ goto error;
}
fh->buffersStartOffset = 0;
fh->bufferSize = 0;
}
fi->fh = (uint64_t)fh;
+ return 0;
+error:
+ if (fh) {
+ if (mutexInit) {
+ pthread_mutex_destroy(&fh->mutex);
+ }
+ free(fh->buf);
+ if (fh->hdfsFH) {
+ hdfsCloseFile(fs, fh->hdfsFH);
+ }
+ if (fh->conn) {
+ hdfsConnRelease(fh->conn);
+ }
+ free(fh);
+ }
return ret;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_read.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_read.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_read.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_read.c Fri Jul 20 19:15:52 2012
@@ -16,9 +16,10 @@
* limitations under the License.
*/
+#include "fuse_connect.h"
#include "fuse_dfs.h"
-#include "fuse_impls.h"
#include "fuse_file_handle.h"
+#include "fuse_impls.h"
static size_t min(const size_t x, const size_t y) {
return x < y ? x : y;
@@ -48,9 +49,9 @@ int dfs_read(const char *path, char *buf
assert(fi);
dfs_fh *fh = (dfs_fh*)fi->fh;
+ hdfsFS fs = hdfsConnGetFs(fh->conn);
assert(fh != NULL);
- assert(fh->fs != NULL);
assert(fh->hdfsFH != NULL);
// special case this as simplifies the rest of the logic to know the caller wanted > 0 bytes
@@ -61,7 +62,7 @@ int dfs_read(const char *path, char *buf
if ( size >= dfs->rdbuffer_size) {
int num_read;
size_t total_read = 0;
- while (size - total_read > 0 && (num_read = hdfsPread(fh->fs, fh->hdfsFH, offset + total_read, buf + total_read, size - total_read)) > 0) {
+ while (size - total_read > 0 && (num_read = hdfsPread(fs, fh->hdfsFH, offset + total_read, buf + total_read, size - total_read)) > 0) {
total_read += num_read;
}
// if there was an error before satisfying the current read, this logic declares it an error
@@ -98,7 +99,7 @@ int dfs_read(const char *path, char *buf
size_t total_read = 0;
while (dfs->rdbuffer_size - total_read > 0 &&
- (num_read = hdfsPread(fh->fs, fh->hdfsFH, offset + total_read, fh->buf + total_read, dfs->rdbuffer_size - total_read)) > 0) {
+ (num_read = hdfsPread(fs, fh->hdfsFH, offset + total_read, fh->buf + total_read, dfs->rdbuffer_size - total_read)) > 0) {
total_read += num_read;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_readdir.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_readdir.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_readdir.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_readdir.c Fri Jul 20 19:15:52 2012
@@ -24,25 +24,31 @@
int dfs_readdir(const char *path, void *buf, fuse_fill_dir_t filler,
off_t offset, struct fuse_file_info *fi)
{
- TRACE1("readdir", path)
+ int ret;
+ struct hdfsConn *conn = NULL;
+ hdfsFS fs;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+ TRACE1("readdir", path)
+
assert(dfs);
assert(path);
assert(buf);
- hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
- if (userFS == NULL) {
- ERROR("Could not connect");
- return -EIO;
+ ret = fuseConnectAsThreadUid(&conn);
+ if (ret) {
+ fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
+ "connection! error %d.\n", ret);
+ ret = -EIO;
+ goto cleanup;
}
+ fs = hdfsConnGetFs(conn);
// Read dirents. Calling a variant that just returns the final path
// component (HDFS-975) would save us from parsing it out below.
int numEntries = 0;
- hdfsFileInfo *info = hdfsListDirectory(userFS, path, &numEntries);
+ hdfsFileInfo *info = hdfsListDirectory(fs, path, &numEntries);
- int ret = 0;
// NULL means either the directory doesn't exist or maybe IO error.
if (NULL == info) {
ret = (errno > 0) ? -errno : -ENOENT;
@@ -106,11 +112,11 @@ int dfs_readdir(const char *path, void *
}
// free the info pointers
hdfsFreeFileInfo(info,numEntries);
+ ret = 0;
cleanup:
- if (doDisconnect(userFS)) {
- ret = -EIO;
- ERROR("Failed to disconnect %d", errno);
+ if (conn) {
+ hdfsConnRelease(conn);
}
return ret;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_release.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_release.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_release.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_release.c Fri Jul 20 19:15:52 2012
@@ -52,15 +52,13 @@ int dfs_release (const char *path, struc
assert(fh);
hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
if (NULL != file_handle) {
- if (hdfsCloseFile(fh->fs, file_handle) != 0) {
+ if (hdfsCloseFile(hdfsConnGetFs(fh->conn), file_handle) != 0) {
ERROR("Could not close handle %ld for %s\n",(long)file_handle, path);
ret = -EIO;
}
}
free(fh->buf);
- if (doDisconnect(fh->fs)) {
- ret = -EIO;
- }
+ hdfsConnRelease(fh->conn);
pthread_mutex_destroy(&fh->mutex);
free(fh);
fi->fh = 0;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rename.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rename.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rename.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rename.c Fri Jul 20 19:15:52 2012
@@ -23,10 +23,12 @@
int dfs_rename(const char *from, const char *to)
{
- TRACE1("rename", from)
-
- // retrieve dfs specific data
+ struct hdfsConn *conn = NULL;
+ hdfsFS fs;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+ int ret;
+
+ TRACE1("rename", from)
// check params and the context var
assert(from);
@@ -46,23 +48,24 @@ int dfs_rename(const char *from, const c
return -EACCES;
}
- hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
- if (userFS == NULL) {
- ERROR("Could not connect");
- return -EIO;
+ ret = fuseConnectAsThreadUid(&conn);
+ if (ret) {
+ fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
+ "connection! error %d.\n", ret);
+ ret = -EIO;
+ goto cleanup;
}
-
- int ret = 0;
- if (hdfsRename(userFS, from, to)) {
+ fs = hdfsConnGetFs(conn);
+ if (hdfsRename(fs, from, to)) {
ERROR("Rename %s to %s failed", from, to);
ret = (errno > 0) ? -errno : -EIO;
goto cleanup;
}
+ ret = 0;
cleanup:
- if (doDisconnect(userFS)) {
- ret = -EIO;
+ if (conn) {
+ hdfsConnRelease(conn);
}
return ret;
-
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rmdir.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rmdir.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rmdir.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_rmdir.c Fri Jul 20 19:15:52 2012
@@ -25,9 +25,14 @@ extern const char *const TrashPrefixDir;
int dfs_rmdir(const char *path)
{
- TRACE1("rmdir", path)
-
+ struct hdfsConn *conn = NULL;
+ hdfsFS fs;
+ int ret;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+ int numEntries = 0;
+ hdfsFileInfo *info = NULL;
+
+ TRACE1("rmdir", path)
assert(path);
assert(dfs);
@@ -35,42 +40,43 @@ int dfs_rmdir(const char *path)
if (is_protected(path)) {
ERROR("Trying to delete protected directory %s", path);
- return -EACCES;
+ ret = -EACCES;
+ goto cleanup;
}
if (dfs->read_only) {
ERROR("HDFS configured read-only, cannot delete directory %s", path);
- return -EACCES;
- }
-
- hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
- if (userFS == NULL) {
- ERROR("Could not connect");
- return -EIO;
+ ret = -EACCES;
+ goto cleanup;
}
- int ret = 0;
- int numEntries = 0;
- hdfsFileInfo *info = hdfsListDirectory(userFS,path,&numEntries);
-
- if (info) {
- hdfsFreeFileInfo(info, numEntries);
+ ret = fuseConnectAsThreadUid(&conn);
+ if (ret) {
+ fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
+ "connection! error %d.\n", ret);
+ ret = -EIO;
+ goto cleanup;
}
-
+ fs = hdfsConnGetFs(conn);
+ info = hdfsListDirectory(fs, path, &numEntries);
if (numEntries) {
ret = -ENOTEMPTY;
goto cleanup;
}
- if (hdfsDeleteWithTrash(userFS, path, dfs->usetrash)) {
+ if (hdfsDeleteWithTrash(fs, path, dfs->usetrash)) {
ERROR("Error trying to delete directory %s", path);
ret = -EIO;
goto cleanup;
}
+ ret = 0;
cleanup:
- if (doDisconnect(userFS)) {
- ret = -EIO;
+ if (info) {
+ hdfsFreeFileInfo(info, numEntries);
+ }
+ if (conn) {
+ hdfsConnRelease(conn);
}
return ret;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_statfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_statfs.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_statfs.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_statfs.c Fri Jul 20 19:15:52 2012
@@ -23,9 +23,12 @@
int dfs_statfs(const char *path, struct statvfs *st)
{
- TRACE1("statfs",path)
-
+ struct hdfsConn *conn = NULL;
+ hdfsFS fs;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+ int ret;
+
+ TRACE1("statfs",path)
assert(path);
assert(st);
@@ -33,19 +36,18 @@ int dfs_statfs(const char *path, struct
memset(st,0,sizeof(struct statvfs));
- hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
- if (userFS == NULL) {
- ERROR("Could not connect");
- return -EIO;
+ ret = fuseConnectAsThreadUid(&conn);
+ if (ret) {
+ fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
+ "connection! error %d.\n", ret);
+ ret = -EIO;
+ goto cleanup;
}
+ fs = hdfsConnGetFs(conn);
- const tOffset cap = hdfsGetCapacity(userFS);
- const tOffset used = hdfsGetUsed(userFS);
- const tOffset bsize = hdfsGetDefaultBlockSize(userFS);
-
- if (doDisconnect(userFS)) {
- return -EIO;
- }
+ const tOffset cap = hdfsGetCapacity(fs);
+ const tOffset used = hdfsGetUsed(fs);
+ const tOffset bsize = hdfsGetDefaultBlockSize(fs);
st->f_bsize = bsize;
st->f_frsize = bsize;
@@ -58,6 +60,11 @@ int dfs_statfs(const char *path, struct
st->f_fsid = 1023;
st->f_flag = ST_RDONLY | ST_NOSUID;
st->f_namemax = 1023;
+ ret = 0;
- return 0;
+cleanup:
+ if (conn) {
+ hdfsConnRelease(conn);
+ }
+ return ret;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_truncate.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_truncate.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_truncate.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_truncate.c Fri Jul 20 19:15:52 2012
@@ -28,10 +28,12 @@
*/
int dfs_truncate(const char *path, off_t size)
{
- TRACE1("truncate", path)
-
+ struct hdfsConn *conn = NULL;
+ hdfsFS fs;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+ TRACE1("truncate", path)
+
assert(path);
assert('/' == *path);
assert(dfs);
@@ -45,31 +47,33 @@ int dfs_truncate(const char *path, off_t
return ret;
}
- hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
- if (userFS == NULL) {
- ERROR("Could not connect");
+ ret = fuseConnectAsThreadUid(&conn);
+ if (ret) {
+ fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
+ "connection! error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
+ fs = hdfsConnGetFs(conn);
int flags = O_WRONLY | O_CREAT;
hdfsFile file;
- if ((file = (hdfsFile)hdfsOpenFile(userFS, path, flags, 0, 0, 0)) == NULL) {
+ if ((file = (hdfsFile)hdfsOpenFile(fs, path, flags, 0, 0, 0)) == NULL) {
ERROR("Could not connect open file %s", path);
ret = -EIO;
goto cleanup;
}
- if (hdfsCloseFile(userFS, file) != 0) {
+ if (hdfsCloseFile(fs, file) != 0) {
ERROR("Could not close file %s", path);
ret = -EIO;
goto cleanup;
}
cleanup:
- if (doDisconnect(userFS)) {
- ret = -EIO;
+ if (conn) {
+ hdfsConnRelease(conn);
}
return ret;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_unlink.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_unlink.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_unlink.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_unlink.c Fri Jul 20 19:15:52 2012
@@ -20,44 +20,51 @@
#include "fuse_impls.h"
#include "fuse_connect.h"
#include "fuse_trash.h"
-extern const char *const TrashPrefixDir;
int dfs_unlink(const char *path)
{
- TRACE1("unlink", path)
-
+ struct hdfsConn *conn = NULL;
+ hdfsFS fs;
int ret = 0;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+ TRACE1("unlink", path)
+
assert(path);
assert(dfs);
assert('/' == *path);
if (is_protected(path)) {
ERROR("Trying to delete protected directory %s", path);
- return -EACCES;
+ ret = -EACCES;
+ goto cleanup;
}
if (dfs->read_only) {
ERROR("HDFS configured read-only, cannot create directory %s", path);
- return -EACCES;
+ ret = -EACCES;
+ goto cleanup;
}
- hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
- if (userFS == NULL) {
- ERROR("Could not connect");
- return -EIO;
+ ret = fuseConnectAsThreadUid(&conn);
+ if (ret) {
+ fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
+ "connection! error %d.\n", ret);
+ ret = -EIO;
+ goto cleanup;
}
+ fs = hdfsConnGetFs(conn);
- if (hdfsDeleteWithTrash(userFS, path, dfs->usetrash)) {
+ if (hdfsDeleteWithTrash(fs, path, dfs->usetrash)) {
ERROR("Could not delete file %s", path);
ret = (errno > 0) ? -errno : -EIO;
goto cleanup;
}
+ ret = 0;
cleanup:
- if (doDisconnect(userFS)) {
- ret = -EIO;
+ if (conn) {
+ hdfsConnRelease(conn);
}
return ret;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_utimens.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_utimens.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_utimens.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_utimens.c Fri Jul 20 19:15:52 2012
@@ -22,10 +22,13 @@
int dfs_utimens(const char *path, const struct timespec ts[2])
{
- TRACE1("utimens", path)
+ struct hdfsConn *conn = NULL;
+ hdfsFS fs;
int ret = 0;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+ TRACE1("utimens", path)
+
assert(path);
assert(dfs);
assert('/' == *path);
@@ -33,14 +36,17 @@ int dfs_utimens(const char *path, const
time_t aTime = ts[0].tv_sec;
time_t mTime = ts[1].tv_sec;
- hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
- if (userFS == NULL) {
- ERROR("Could not connect");
- return -EIO;
+ ret = fuseConnectAsThreadUid(&conn);
+ if (ret) {
+ fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
+ "connection! error %d.\n", ret);
+ ret = -EIO;
+ goto cleanup;
}
+ fs = hdfsConnGetFs(conn);
- if (hdfsUtime(userFS, path, mTime, aTime)) {
- hdfsFileInfo *info = hdfsGetPathInfo(userFS, path);
+ if (hdfsUtime(fs, path, mTime, aTime)) {
+ hdfsFileInfo *info = hdfsGetPathInfo(fs, path);
if (info == NULL) {
ret = (errno > 0) ? -errno : -ENOENT;
goto cleanup;
@@ -54,10 +60,11 @@ int dfs_utimens(const char *path, const
}
goto cleanup;
}
+ ret = 0;
cleanup:
- if (doDisconnect(userFS)) {
- ret = -EIO;
+ if (conn) {
+ hdfsConnRelease(conn);
}
return ret;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_write.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_write.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_write.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_write.c Fri Jul 20 19:15:52 2012
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+#include "fuse_connect.h"
#include "fuse_dfs.h"
#include "fuse_impls.h"
#include "fuse_file_handle.h"
@@ -48,15 +49,15 @@ int dfs_write(const char *path, const ch
pthread_mutex_lock(&fh->mutex);
tSize length = 0;
- assert(fh->fs);
+ hdfsFS fs = hdfsConnGetFs(fh->conn);
- tOffset cur_offset = hdfsTell(fh->fs, file_handle);
+ tOffset cur_offset = hdfsTell(fs, file_handle);
if (cur_offset != offset) {
ERROR("User trying to random access write to a file %d != %d for %s",
(int)cur_offset, (int)offset, path);
ret = -ENOTSUP;
} else {
- length = hdfsWrite(fh->fs, file_handle, buf, size);
+ length = hdfsWrite(fs, file_handle, buf, size);
if (length <= 0) {
ERROR("Could not write all bytes for %s %d != %d (errno=%d)",
path, length, (int)size, errno);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_init.c Fri Jul 20 19:15:52 2012
@@ -78,8 +78,20 @@ void init_protectedpaths(dfs_context *df
dfs->protectedpaths[j] = NULL;
}
+static void dfsPrintOptions(FILE *fp, const struct options *o)
+{
+ fprintf(fp, "[ protected=%s, nn_uri=%s, nn_port=%d, "
+ "debug=%d, read_only=%d, initchecks=%d, "
+ "no_permissions=%d, usetrash=%d, entry_timeout=%d, "
+ "attribute_timeout=%d, rdbuffer_size=%Zd, direct_io=%d ]",
+ (o->protected ? o->protected : "(NULL)"), o->nn_uri, o->nn_port,
+ o->debug, o->read_only, o->initchecks,
+ o->no_permissions, o->usetrash, o->entry_timeout,
+ o->attribute_timeout, o->rdbuffer_size, o->direct_io);
+}
-void *dfs_init(void) {
+void *dfs_init(void)
+{
//
// Create a private struct of data we will pass to fuse here and which
// will then be accessible on every call.
@@ -92,15 +104,15 @@ void *dfs_init(void) {
// initialize the context
dfs->debug = options.debug;
- dfs->nn_uri = options.nn_uri;
- dfs->nn_port = options.nn_port;
dfs->read_only = options.read_only;
dfs->usetrash = options.usetrash;
dfs->protectedpaths = NULL;
dfs->rdbuffer_size = options.rdbuffer_size;
dfs->direct_io = options.direct_io;
- INFO("Mounting. nn_uri=%s, nn_port=%d", dfs->nn_uri, dfs->nn_port);
+ fprintf(stderr, "Mounting with options ");
+ dfsPrintOptions(stderr, &options);
+ fprintf(stderr, "\n");
init_protectedpaths(dfs);
assert(dfs->protectedpaths != NULL);
@@ -109,12 +121,6 @@ void *dfs_init(void) {
DEBUG("dfs->rdbuffersize <= 0 = %ld", dfs->rdbuffer_size);
dfs->rdbuffer_size = 32768;
}
-
- if (0 != allocFsTable()) {
- ERROR("FATAL: could not allocate ");
- exit(1);
- }
-
return (void*)dfs;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c Fri Jul 20 19:15:52 2012
@@ -225,7 +225,7 @@ done:
*
* @return 0 on success; error code otherwise
*/
-static int hadoopConfSet(JNIEnv *env, jobject jConfiguration,
+static int hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
const char *key, const char *value)
{
int ret;
@@ -283,7 +283,7 @@ static int jStrToCstr(JNIEnv *env, jstri
return 0;
}
-static int hadoopConfGet(JNIEnv *env, jobject jConfiguration,
+static int hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
const char *key, char **val)
{
int ret;
@@ -301,7 +301,7 @@ static int hadoopConfGet(JNIEnv *env, jo
HADOOP_CONF, "get", JMETHOD1(JPARAM(JAVA_STRING),
JPARAM(JAVA_STRING)), jkey);
if (ret) {
- snprintf(buf, sizeof(buf), "hadoopConfGet(%s)", key);
+ snprintf(buf, sizeof(buf), "hadoopConfGetStr(%s)", key);
ret = errnoFromException(jExc, env, buf);
goto done;
}
@@ -321,7 +321,7 @@ done:
return ret;
}
-int hdfsConfGet(const char *key, char **val)
+int hdfsConfGetStr(const char *key, char **val)
{
JNIEnv *env;
int ret;
@@ -339,19 +339,67 @@ int hdfsConfGet(const char *key, char **
ret = EINTERNAL;
goto done;
}
- ret = hadoopConfGet(env, jConfiguration, key, val);
+ ret = hadoopConfGetStr(env, jConfiguration, key, val);
+done:
+ destroyLocalReference(env, jConfiguration);
if (ret)
+ errno = ret;
+ return ret;
+}
+
+static int hadoopConfGetInt(JNIEnv *env, jobject jConfiguration,
+ const char *key, int32_t *val)
+{
+ int ret;
+ jthrowable jExc = NULL;
+ jvalue jVal;
+ jstring jkey = NULL;
+ char buf[1024];
+
+ jkey = (*env)->NewStringUTF(env, key);
+ if (!jkey) {
+ (*env)->ExceptionDescribe(env);
+ return ENOMEM;
+ }
+ ret = invokeMethod(env, &jVal, &jExc, INSTANCE, jConfiguration,
+ HADOOP_CONF, "getInt", JMETHOD2(JPARAM(JAVA_STRING), "I", "I"),
+ jkey, (jint)(*val));
+ destroyLocalReference(env, jkey);
+ if (ret) {
+ snprintf(buf, sizeof(buf), "hadoopConfGetInt(%s)", key);
+ return errnoFromException(jExc, env, buf);
+ }
+ *val = jVal.i;
+ return 0;
+}
+
+int hdfsConfGetInt(const char *key, int32_t *val)
+{
+ JNIEnv *env;
+ int ret;
+ jobject jConfiguration = NULL;
+
+ env = getJNIEnv();
+ if (env == NULL) {
+ ret = EINTERNAL;
+ goto done;
+ }
+ jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V");
+ if (jConfiguration == NULL) {
+ fprintf(stderr, "Can't construct instance of class "
+ "org.apache.hadoop.conf.Configuration\n");
+ ret = EINTERNAL;
goto done;
- ret = 0;
+ }
+ ret = hadoopConfGetInt(env, jConfiguration, key, val);
done:
- if (jConfiguration)
- destroyLocalReference(env, jConfiguration);
+ destroyLocalReference(env, jConfiguration);
if (ret)
errno = ret;
return ret;
}
-void hdfsConfFree(char *val)
+void hdfsConfStrFree(char *val)
{
free(val);
}
@@ -583,7 +631,7 @@ hdfsFS hdfsBuilderConnect(struct hdfsBui
}
if (bld->kerbTicketCachePath) {
- ret = hadoopConfSet(env, jConfiguration,
+ ret = hadoopConfSetStr(env, jConfiguration,
KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath);
if (ret)
goto done;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h Fri Jul 20 19:15:52 2012
@@ -220,21 +220,33 @@ extern "C" {
* Get a configuration string.
*
* @param key The key to find
- * @param val (out param) The value. This will be NULL if the
+ * @param val (out param) The value. This will be set to NULL if the
* key isn't found. You must free this string with
- * hdfsConfFree.
+ * hdfsConfStrFree.
*
* @return 0 on success; nonzero error code otherwise.
* Failure to find the key is not an error.
*/
- int hdfsConfGet(const char *key, char **val);
+ int hdfsConfGetStr(const char *key, char **val);
/**
- * Free a configuration string found with hdfsConfGet.
+ * Get a configuration integer.
*
- * @param val A configuration string obtained from hdfsConfGet
+ * @param key The key to find
+ * @param val (out param) The value. This will NOT be changed if the
+ * key isn't found.
+ *
+ * @return 0 on success; nonzero error code otherwise.
+ * Failure to find the key is not an error.
+ */
+ int hdfsConfGetInt(const char *key, int32_t *val);
+
+ /**
+ * Free a configuration string found with hdfsConfGetStr.
+ *
+ * @param val A configuration string obtained from hdfsConfGetStr
*/
- void hdfsConfFree(char *val);
+ void hdfsConfStrFree(char *val);
/**
* hdfsDisconnect - Disconnect from the hdfs file system.
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c?rev=1363904&r1=1363903&r2=1363904&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c Fri Jul 20 19:15:52 2012
@@ -412,6 +412,7 @@ char *classNameOfObject(jobject jobj, JN
return newstr;
}
+
/**
* Get the global JNI environemnt.
*
@@ -500,6 +501,11 @@ static JNIEnv* getGlobalJNIEnv(void)
"with error: %d\n", rv);
return NULL;
}
+ if (invokeMethod(env, NULL, NULL, STATIC, NULL,
+ "org/apache/hadoop/fs/FileSystem",
+ "loadFileSystems", "()V") != 0) {
+ (*env)->ExceptionDescribe(env);
+ }
}
else {
//Attach this thread to the VM