You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2009/03/19 20:38:12 UTC
svn commit: r756152 - in /hadoop/core/trunk: CHANGES.txt
src/c++/libhdfs/hdfs.c src/c++/libhdfs/hdfs.h src/c++/libhdfs/hdfs_test.c
src/core/org/apache/hadoop/fs/FileSystem.java
src/test/org/apache/hadoop/fs/TestFileSystem.java
Author: dhruba
Date: Thu Mar 19 19:38:12 2009
New Revision: 756152
URL: http://svn.apache.org/viewvc?rev=756152&view=rev
Log:
HADOOP-4655. New method FileSystem.newInstance() that always returns
a newly allocated FileSystem object. (dhruba)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/c++/libhdfs/hdfs.c
hadoop/core/trunk/src/c++/libhdfs/hdfs.h
hadoop/core/trunk/src/c++/libhdfs/hdfs_test.c
hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=756152&r1=756151&r2=756152&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Mar 19 19:38:12 2009
@@ -175,6 +175,9 @@
SequenceFile.CompressedBytes and SequenceFile.UncompressedBytes.
(hong tang via mahadev)
+ HADOOP-4655. New method FileSystem.newInstance() that always returns
+ a newly allocated FileSystem object. (dhruba)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/core/trunk/src/c++/libhdfs/hdfs.c
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/libhdfs/hdfs.c?rev=756152&r1=756151&r2=756152&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/libhdfs/hdfs.c (original)
+++ hadoop/core/trunk/src/c++/libhdfs/hdfs.c Thu Mar 19 19:38:12 2009
@@ -158,10 +158,15 @@
hdfsFS hdfsConnect(const char* host, tPort port) {
- // conect with NULL as user name/groups
+ // connect with NULL as user name/groups
return hdfsConnectAsUser(host, port, NULL, NULL, 0);
}
+/** Always return a new FileSystem handle */
+hdfsFS hdfsConnectNewInstance(const char* host, tPort port) {
+ // connect with NULL as user name/groups
+ return hdfsConnectAsUserNewInstance(host, port, NULL, NULL, 0);
+}
hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user , const char **groups, int groups_size )
{
@@ -345,6 +350,187 @@
}
+/** Always return a new FileSystem handle */
+hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user , const char **groups, int groups_size )
+{
+ // JAVA EQUIVALENT:
+ // FileSystem fs = FileSystem.get(new Configuration());
+ // return fs;
+
+ JNIEnv *env = 0;
+ jobject jConfiguration = NULL;
+ jobject jFS = NULL;
+ jobject jURI = NULL;
+ jstring jURIString = NULL;
+ jvalue jVal;
+ jthrowable jExc = NULL;
+ char *cURI = 0;
+ jobject gFsRef = NULL;
+
+
+ //Get the JNIEnv* corresponding to current thread
+ env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return NULL;
+ }
+
+ //Create the org.apache.hadoop.conf.Configuration object
+ jConfiguration =
+ constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V");
+
+ if (jConfiguration == NULL) {
+ fprintf(stderr, "Can't construct instance of class "
+ "org.apache.hadoop.conf.Configuration\n");
+ errno = EINTERNAL;
+ return NULL;
+ }
+
+ if (user != NULL) {
+
+ if (groups == NULL || groups_size <= 0) {
+ fprintf(stderr, "ERROR: groups must not be empty/null\n");
+ errno = EINVAL;
+ return NULL;
+ }
+
+ jstring jUserString = (*env)->NewStringUTF(env, user);
+ jarray jGroups = constructNewArrayString(env, &jExc, groups, groups_size);
+ if (jGroups == NULL) {
+ errno = EINTERNAL;
+ fprintf(stderr, "ERROR: could not construct groups array\n");
+ return NULL;
+ }
+
+ jobject jUgi;
+ if ((jUgi = constructNewObjectOfClass(env, &jExc, HADOOP_UNIX_USER_GROUP_INFO, JMETHOD2(JPARAM(JAVA_STRING), JARRPARAM(JAVA_STRING), JAVA_VOID), jUserString, jGroups)) == NULL) {
+ fprintf(stderr,"failed to construct hadoop user unix group info object\n");
+ errno = errnoFromException(jExc, env, HADOOP_UNIX_USER_GROUP_INFO,
+ "init");
+ destroyLocalReference(env, jConfiguration);
+ destroyLocalReference(env, jUserString);
+ if (jGroups != NULL) {
+ destroyLocalReference(env, jGroups);
+ }
+ return NULL;
+ }
+#define USE_UUGI
+#ifdef USE_UUGI
+
+ // UnixUserGroupInformation.UGI_PROPERTY_NAME
+ jstring jAttrString = (*env)->NewStringUTF(env,"hadoop.job.ugi");
+
+ if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_UNIX_USER_GROUP_INFO, "saveToConf",
+ JMETHOD3(JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING), JPARAM(HADOOP_UNIX_USER_GROUP_INFO), JAVA_VOID),
+ jConfiguration, jAttrString, jUgi) != 0) {
+ errno = errnoFromException(jExc, env, HADOOP_FSPERM,
+ "init");
+ destroyLocalReference(env, jConfiguration);
+ destroyLocalReference(env, jUserString);
+ if (jGroups != NULL) {
+ destroyLocalReference(env, jGroups);
+ }
+ destroyLocalReference(env, jUgi);
+ return NULL;
+ }
+
+ destroyLocalReference(env, jUserString);
+ destroyLocalReference(env, jGroups);
+ destroyLocalReference(env, jUgi);
+ }
+#else
+
+ // what does "current" mean in the context of libhdfs ? does it mean for the last hdfs connection we used?
+ // that's why this code cannot be activated. We know the above use of the conf object should work well with
+ // multiple connections.
+ if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_USER_GROUP_INFO, "setCurrentUGI",
+ JMETHOD1(JPARAM(HADOOP_USER_GROUP_INFO), JAVA_VOID),
+ jUgi) != 0) {
+ errno = errnoFromException(jExc, env, HADOOP_USER_GROUP_INFO,
+ "setCurrentUGI");
+ destroyLocalReference(env, jConfiguration);
+ destroyLocalReference(env, jUserString);
+ if (jGroups != NULL) {
+ destroyLocalReference(env, jGroups);
+ }
+ destroyLocalReference(env, jUgi);
+ return NULL;
+ }
+
+ destroyLocalReference(env, jUserString);
+ destroyLocalReference(env, jGroups);
+ destroyLocalReference(env, jUgi);
+ }
+#endif
+ //Check what type of FileSystem the caller wants...
+ if (host == NULL) {
+ // fs = FileSytem::newInstanceLocal(conf);
+ if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "newInstanceLocal",
+ JMETHOD1(JPARAM(HADOOP_CONF),
+ JPARAM(HADOOP_LOCALFS)),
+ jConfiguration) != 0) {
+ errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
+ "FileSystem::newInstanceLocal");
+ goto done;
+ }
+ jFS = jVal.l;
+ }
+ else if (!strcmp(host, "default") && port == 0) {
+ //fs = FileSystem::get(conf);
+ if (invokeMethod(env, &jVal, &jExc, STATIC, NULL,
+ HADOOP_FS, "newInstance",
+ JMETHOD1(JPARAM(HADOOP_CONF),
+ JPARAM(HADOOP_FS)),
+ jConfiguration) != 0) {
+ errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
+ "FileSystem::newInstance");
+ goto done;
+ }
+ jFS = jVal.l;
+ }
+ else {
+ // fs = FileSystem::newInstance(URI, conf);
+ cURI = malloc(strlen(host)+16);
+ sprintf(cURI, "hdfs://%s:%d", host, (int)(port));
+
+ jURIString = (*env)->NewStringUTF(env, cURI);
+ if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, JAVA_NET_URI,
+ "create", "(Ljava/lang/String;)Ljava/net/URI;",
+ jURIString) != 0) {
+ errno = errnoFromException(jExc, env, "java.net.URI::create");
+ goto done;
+ }
+ jURI = jVal.l;
+
+ if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "newInstance",
+ JMETHOD2(JPARAM(JAVA_NET_URI),
+ JPARAM(HADOOP_CONF), JPARAM(HADOOP_FS)),
+ jURI, jConfiguration) != 0) {
+ errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
+ "Filesystem::newInstance(URI, Configuration)");
+ goto done;
+ }
+
+ jFS = jVal.l;
+ }
+
+ done:
+
+ // Release unnecessary local references
+ destroyLocalReference(env, jConfiguration);
+ destroyLocalReference(env, jURIString);
+ destroyLocalReference(env, jURI);
+
+ if (cURI) free(cURI);
+
+ /* Create a global reference for this fs */
+ if (jFS) {
+ gFsRef = (*env)->NewGlobalRef(env, jFS);
+ destroyLocalReference(env, jFS);
+ }
+
+ return gFsRef;
+}
int hdfsDisconnect(hdfsFS fs)
{
Modified: hadoop/core/trunk/src/c++/libhdfs/hdfs.h
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/libhdfs/hdfs.h?rev=756152&r1=756151&r2=756152&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/libhdfs/hdfs.h (original)
+++ hadoop/core/trunk/src/c++/libhdfs/hdfs.h Thu Mar 19 19:38:12 2009
@@ -90,7 +90,6 @@
};
typedef struct hdfsFile_internal* hdfsFile;
-
/**
* hdfsConnectAsUser - Connect to a hdfs file system as a specific user
* Connect to the hdfs.
@@ -121,6 +120,14 @@
hdfsFS hdfsConnect(const char* host, tPort port);
+ /**
+ * This are the same as hdfsConnectAsUser except that every invocation returns a new FileSystem handle.
+ * Applications should call a hdfsDisconnect for every call to hdfsConnectAsUserNewInstance.
+ */
+ hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user , const char *groups[], int groups_size );
+ hdfsFS hdfsConnectNewInstance(const char* host, tPort port);
+ hdfsFS hdfsConnectPath(const char* uri);
+
/**
* hdfsDisconnect - Disconnect from the hdfs file system.
* Disconnect from hdfs.
Modified: hadoop/core/trunk/src/c++/libhdfs/hdfs_test.c
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/libhdfs/hdfs_test.c?rev=756152&r1=756151&r2=756152&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/libhdfs/hdfs_test.c (original)
+++ hadoop/core/trunk/src/c++/libhdfs/hdfs_test.c Thu Mar 19 19:38:12 2009
@@ -52,13 +52,13 @@
int main(int argc, char **argv) {
- hdfsFS fs = hdfsConnect("default", 0);
+ hdfsFS fs = hdfsConnectNewInstance("default", 0);
if(!fs) {
fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
exit(-1);
}
- hdfsFS lfs = hdfsConnect(NULL, 0);
+ hdfsFS lfs = hdfsConnectNewInstance(NULL, 0);
if(!lfs) {
fprintf(stderr, "Oops! Failed to connect to 'local' hdfs!\n");
exit(-1);
@@ -401,7 +401,7 @@
groups[0] = "users";
groups[1] = "nobody";
- fs = hdfsConnectAsUser("default", 0, tuser, groups, 2);
+ fs = hdfsConnectAsUserNewInstance("default", 0, tuser, groups, 2);
if(!fs) {
fprintf(stderr, "Oops! Failed to connect to hdfs as user %s!\n",tuser);
exit(-1);
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java?rev=756152&r1=756151&r2=756152&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java Thu Mar 19 19:38:12 2009
@@ -185,6 +185,47 @@
return CACHE.get(uri, conf);
}
+ /** Returns the FileSystem for this URI's scheme and authority. The scheme
+ * of the URI determines a configuration property name,
+ * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
+ * The entire URI is passed to the FileSystem instance's initialize method.
+ * This always returns a new FileSystem object.
+ */
+ public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
+ String scheme = uri.getScheme();
+ String authority = uri.getAuthority();
+
+ if (scheme == null) { // no scheme: use default FS
+ return newInstance(conf);
+ }
+
+ if (authority == null) { // no authority
+ URI defaultUri = getDefaultUri(conf);
+ if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
+ && defaultUri.getAuthority() != null) { // & default has authority
+ return newInstance(defaultUri, conf); // return default
+ }
+ }
+ return CACHE.getUnique(uri, conf);
+ }
+
+ /** Returns a unique configured filesystem implementation.
+ * This always returns a new FileSystem object. */
+ public static FileSystem newInstance(Configuration conf) throws IOException {
+ return newInstance(getDefaultUri(conf), conf);
+ }
+
+ /**
+ * Get a unique local file system object
+ * @param conf the configuration to configure the file system with
+ * @return a LocalFileSystem
+ * This always returns a new FileSystem object.
+ */
+ public static LocalFileSystem newInstanceLocal(Configuration conf)
+ throws IOException {
+ return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
+ }
+
private static class ClientFinalizer extends Thread {
public synchronized void run() {
try {
@@ -1360,8 +1401,21 @@
static class Cache {
private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
+ /** A variable that makes all objects in the cache unique */
+ private static AtomicLong unique = new AtomicLong(1);
+
synchronized FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
+ return getInternal(uri, conf, key);
+ }
+
+ /** The objects inserted into the cache using this method are all unique */
+ synchronized FileSystem getUnique(URI uri, Configuration conf) throws IOException{
+ Key key = new Key(uri, conf, unique.getAndIncrement());
+ return getInternal(uri, conf, key);
+ }
+
+ private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
FileSystem fs = map.get(key);
if (fs == null) {
fs = createFileSystem(uri, conf);
@@ -1416,10 +1470,16 @@
final String scheme;
final String authority;
final String username;
+ final long unique; // an artificial way to make a key unique
Key(URI uri, Configuration conf) throws IOException {
+ this(uri, conf, 0);
+ }
+
+ Key(URI uri, Configuration conf, long unique) throws IOException {
scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
+ this.unique = unique;
UserGroupInformation ugi = UserGroupInformation.readFrom(conf);
if (ugi == null) {
try {
@@ -1433,7 +1493,7 @@
/** {@inheritDoc} */
public int hashCode() {
- return (scheme + authority + username).hashCode();
+ return (scheme + authority + username).hashCode() + (int)unique;
}
static boolean isEqual(Object a, Object b) {
@@ -1449,7 +1509,8 @@
Key that = (Key)obj;
return isEqual(this.scheme, that.scheme)
&& isEqual(this.authority, that.authority)
- && isEqual(this.username, that.username);
+ && isEqual(this.username, that.username)
+ && (this.unique == that.unique);
}
return false;
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=756152&r1=756151&r2=756152&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java Thu Mar 19 19:38:12 2009
@@ -618,4 +618,20 @@
assertTrue(map.containsKey(lowercaseCachekey2));
}
+
+ public static void testFsUniqueness(long megaBytes, int numFiles, long seed)
+ throws Exception {
+
+ // multiple invocations of FileSystem.get return the same object.
+ FileSystem fs1 = FileSystem.get(conf);
+ FileSystem fs2 = FileSystem.get(conf);
+ assertTrue(fs1 == fs2);
+
+ // multiple invocations of FileSystem.newInstance return different objects
+ fs1 = FileSystem.newInstance(conf);
+ fs2 = FileSystem.newInstance(conf);
+ assertTrue(fs1 != fs2 && !fs1.equals(fs2));
+ fs1.close();
+ fs2.close();
+ }
}