You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2009/03/19 20:38:12 UTC

svn commit: r756152 - in /hadoop/core/trunk: CHANGES.txt src/c++/libhdfs/hdfs.c src/c++/libhdfs/hdfs.h src/c++/libhdfs/hdfs_test.c src/core/org/apache/hadoop/fs/FileSystem.java src/test/org/apache/hadoop/fs/TestFileSystem.java

Author: dhruba
Date: Thu Mar 19 19:38:12 2009
New Revision: 756152

URL: http://svn.apache.org/viewvc?rev=756152&view=rev
Log:
HADOOP-4655. New method FileSystem.newInstance() that always returns
a newly allocated FileSystem object. (dhruba)


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/c++/libhdfs/hdfs.c
    hadoop/core/trunk/src/c++/libhdfs/hdfs.h
    hadoop/core/trunk/src/c++/libhdfs/hdfs_test.c
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=756152&r1=756151&r2=756152&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Mar 19 19:38:12 2009
@@ -175,6 +175,9 @@
     SequenceFile.CompressedBytes and SequenceFile.UncompressedBytes.
     (hong tang via mahadev)
 
+    HADOOP-4655. New method FileSystem.newInstance() that always returns
+    a newly allocated FileSystem object. (dhruba)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/core/trunk/src/c++/libhdfs/hdfs.c
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/libhdfs/hdfs.c?rev=756152&r1=756151&r2=756152&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/libhdfs/hdfs.c (original)
+++ hadoop/core/trunk/src/c++/libhdfs/hdfs.c Thu Mar 19 19:38:12 2009
@@ -158,10 +158,15 @@
 
 
 hdfsFS hdfsConnect(const char* host, tPort port) {
-  // conect with NULL as user name/groups
+  // connect with NULL as user name/groups
   return hdfsConnectAsUser(host, port, NULL, NULL, 0);
 }
 
+/** Always return a new FileSystem handle */
+hdfsFS hdfsConnectNewInstance(const char* host, tPort port) {
+  // connect with NULL as user name/groups
+  return hdfsConnectAsUserNewInstance(host, port, NULL, NULL, 0);
+}
 
 hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user , const char **groups, int groups_size )
 {
@@ -345,6 +350,187 @@
 }
 
 
+/** Always return a new FileSystem handle */
+hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user , const char **groups, int groups_size )
+{
+    // JAVA EQUIVALENT:
+    //  FileSystem fs = FileSystem.get(new Configuration());
+    //  return fs;
+
+    JNIEnv *env = 0;
+    jobject jConfiguration = NULL;
+    jobject jFS = NULL;
+    jobject jURI = NULL;
+    jstring jURIString = NULL;
+    jvalue  jVal;
+    jthrowable jExc = NULL;
+    char    *cURI = 0;
+    jobject gFsRef = NULL;
+
+
+    //Get the JNIEnv* corresponding to current thread
+    env = getJNIEnv();
+    if (env == NULL) {
+      errno = EINTERNAL;
+      return NULL;
+    }
+
+    //Create the org.apache.hadoop.conf.Configuration object
+    jConfiguration =
+        constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V");
+
+    if (jConfiguration == NULL) {
+        fprintf(stderr, "Can't construct instance of class "
+                "org.apache.hadoop.conf.Configuration\n");
+        errno = EINTERNAL;
+        return NULL;
+    }
+ 
+    if (user != NULL) {
+
+      if (groups == NULL || groups_size <= 0) {
+        fprintf(stderr, "ERROR: groups must not be empty/null\n");
+        errno = EINVAL;
+        return NULL;
+      }
+
+      jstring jUserString = (*env)->NewStringUTF(env, user);
+      jarray jGroups = constructNewArrayString(env, &jExc, groups, groups_size);
+      if (jGroups == NULL) {
+        errno = EINTERNAL;
+        fprintf(stderr, "ERROR: could not construct groups array\n");
+        return NULL;
+      }
+
+      jobject jUgi;
+      if ((jUgi = constructNewObjectOfClass(env, &jExc, HADOOP_UNIX_USER_GROUP_INFO, JMETHOD2(JPARAM(JAVA_STRING), JARRPARAM(JAVA_STRING), JAVA_VOID), jUserString, jGroups)) == NULL) {
+        fprintf(stderr,"failed to construct hadoop user unix group info object\n");
+        errno = errnoFromException(jExc, env, HADOOP_UNIX_USER_GROUP_INFO,
+                                   "init");
+        destroyLocalReference(env, jConfiguration);
+        destroyLocalReference(env, jUserString);
+        if (jGroups != NULL) {
+          destroyLocalReference(env, jGroups);
+        }          
+        return NULL;
+      }
+#define USE_UUGI
+#ifdef USE_UUGI
+
+      // UnixUserGroupInformation.UGI_PROPERTY_NAME
+      jstring jAttrString = (*env)->NewStringUTF(env,"hadoop.job.ugi");
+      
+      if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_UNIX_USER_GROUP_INFO, "saveToConf",
+                       JMETHOD3(JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING), JPARAM(HADOOP_UNIX_USER_GROUP_INFO), JAVA_VOID),
+                       jConfiguration, jAttrString, jUgi) != 0) {
+        errno = errnoFromException(jExc, env, HADOOP_FSPERM,
+                                   "init");
+        destroyLocalReference(env, jConfiguration);
+        destroyLocalReference(env, jUserString);
+        if (jGroups != NULL) {
+          destroyLocalReference(env, jGroups);
+        }          
+        destroyLocalReference(env, jUgi);
+        return NULL;
+      }
+
+      destroyLocalReference(env, jUserString);
+      destroyLocalReference(env, jGroups);
+      destroyLocalReference(env, jUgi);
+    }
+#else
+    
+    // what does "current" mean in the context of libhdfs ? does it mean for the last hdfs connection we used?
+    // that's why this code cannot be activated. We know the above use of the conf object should work well with 
+    // multiple connections.
+      if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_USER_GROUP_INFO, "setCurrentUGI",
+                       JMETHOD1(JPARAM(HADOOP_USER_GROUP_INFO), JAVA_VOID),
+                       jUgi) != 0) {
+        errno = errnoFromException(jExc, env, HADOOP_USER_GROUP_INFO,
+                                   "setCurrentUGI");
+        destroyLocalReference(env, jConfiguration);
+        destroyLocalReference(env, jUserString);
+        if (jGroups != NULL) {
+          destroyLocalReference(env, jGroups);
+        }          
+        destroyLocalReference(env, jUgi);
+        return NULL;
+      }
+
+      destroyLocalReference(env, jUserString);
+      destroyLocalReference(env, jGroups);
+      destroyLocalReference(env, jUgi);
+    }
+#endif      
+    //Check what type of FileSystem the caller wants...
+    if (host == NULL) {
+        // fs = FileSytem::newInstanceLocal(conf);
+        if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "newInstanceLocal",
+                         JMETHOD1(JPARAM(HADOOP_CONF),
+                                  JPARAM(HADOOP_LOCALFS)),
+                         jConfiguration) != 0) {
+            errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
+                                       "FileSystem::newInstanceLocal");
+            goto done;
+        }
+        jFS = jVal.l;
+    }
+    else if (!strcmp(host, "default") && port == 0) {
+        //fs = FileSystem::get(conf); 
+        if (invokeMethod(env, &jVal, &jExc, STATIC, NULL,
+                         HADOOP_FS, "newInstance",
+                         JMETHOD1(JPARAM(HADOOP_CONF),
+                                  JPARAM(HADOOP_FS)),
+                         jConfiguration) != 0) {
+            errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
+                                       "FileSystem::newInstance");
+            goto done;
+        }
+        jFS = jVal.l;
+    }
+    else {
+        // fs = FileSystem::newInstance(URI, conf);
+        cURI = malloc(strlen(host)+16);
+        sprintf(cURI, "hdfs://%s:%d", host, (int)(port));
+
+        jURIString = (*env)->NewStringUTF(env, cURI);
+        if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, JAVA_NET_URI,
+                         "create", "(Ljava/lang/String;)Ljava/net/URI;",
+                         jURIString) != 0) {
+            errno = errnoFromException(jExc, env, "java.net.URI::create");
+            goto done;
+        }
+        jURI = jVal.l;
+
+        if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "newInstance",
+                         JMETHOD2(JPARAM(JAVA_NET_URI),
+                                  JPARAM(HADOOP_CONF), JPARAM(HADOOP_FS)),
+                         jURI, jConfiguration) != 0) {
+            errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
+                                       "Filesystem::newInstance(URI, Configuration)");
+            goto done;
+        }
+
+        jFS = jVal.l;
+    }
+
+  done:
+    
+    // Release unnecessary local references
+    destroyLocalReference(env, jConfiguration);
+    destroyLocalReference(env, jURIString);
+    destroyLocalReference(env, jURI);
+
+    if (cURI) free(cURI);
+
+    /* Create a global reference for this fs */
+    if (jFS) {
+        gFsRef = (*env)->NewGlobalRef(env, jFS);
+        destroyLocalReference(env, jFS);
+    }
+
+    return gFsRef;
+}
 
 int hdfsDisconnect(hdfsFS fs)
 {

Modified: hadoop/core/trunk/src/c++/libhdfs/hdfs.h
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/libhdfs/hdfs.h?rev=756152&r1=756151&r2=756152&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/libhdfs/hdfs.h (original)
+++ hadoop/core/trunk/src/c++/libhdfs/hdfs.h Thu Mar 19 19:38:12 2009
@@ -90,7 +90,6 @@
     };
     typedef struct hdfsFile_internal* hdfsFile;
       
-
     /** 
      * hdfsConnectAsUser - Connect to a hdfs file system as a specific user
      * Connect to the hdfs.
@@ -121,6 +120,14 @@
      hdfsFS hdfsConnect(const char* host, tPort port);
 
 
+    /**
+     * This are the same as hdfsConnectAsUser except that every invocation returns a new FileSystem handle.
+     * Applications should call a hdfsDisconnect for every call to hdfsConnectAsUserNewInstance.
+     */
+     hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user , const char *groups[], int groups_size );
+     hdfsFS hdfsConnectNewInstance(const char* host, tPort port);
+     hdfsFS hdfsConnectPath(const char* uri);
+
     /** 
      * hdfsDisconnect - Disconnect from the hdfs file system.
      * Disconnect from hdfs.

Modified: hadoop/core/trunk/src/c++/libhdfs/hdfs_test.c
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/libhdfs/hdfs_test.c?rev=756152&r1=756151&r2=756152&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/libhdfs/hdfs_test.c (original)
+++ hadoop/core/trunk/src/c++/libhdfs/hdfs_test.c Thu Mar 19 19:38:12 2009
@@ -52,13 +52,13 @@
 
 int main(int argc, char **argv) {
 
-    hdfsFS fs = hdfsConnect("default", 0);
+    hdfsFS fs = hdfsConnectNewInstance("default", 0);
     if(!fs) {
         fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
         exit(-1);
     } 
  
-    hdfsFS lfs = hdfsConnect(NULL, 0);
+    hdfsFS lfs = hdfsConnectNewInstance(NULL, 0);
     if(!lfs) {
         fprintf(stderr, "Oops! Failed to connect to 'local' hdfs!\n");
         exit(-1);
@@ -401,7 +401,7 @@
       groups[0] = "users";
       groups[1] = "nobody";
 
-      fs = hdfsConnectAsUser("default", 0, tuser, groups, 2);
+      fs = hdfsConnectAsUserNewInstance("default", 0, tuser, groups, 2);
       if(!fs) {
         fprintf(stderr, "Oops! Failed to connect to hdfs as user %s!\n",tuser);
         exit(-1);

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java?rev=756152&r1=756151&r2=756152&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java Thu Mar 19 19:38:12 2009
@@ -185,6 +185,47 @@
     return CACHE.get(uri, conf);
   }
 
+  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
+   * of the URI determines a configuration property name,
+   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
+   * The entire URI is passed to the FileSystem instance's initialize method.
+   * This always returns a new FileSystem object.
+   */
+  public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
+    String scheme = uri.getScheme();
+    String authority = uri.getAuthority();
+
+    if (scheme == null) {                       // no scheme: use default FS
+      return newInstance(conf);
+    }
+
+    if (authority == null) {                       // no authority
+      URI defaultUri = getDefaultUri(conf);
+      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
+          && defaultUri.getAuthority() != null) {  // & default has authority
+        return newInstance(defaultUri, conf);              // return default
+      }
+    }
+    return CACHE.getUnique(uri, conf);
+  }
+
+  /** Returns a unique configured filesystem implementation.
+   * This always returns a new FileSystem object. */
+  public static FileSystem newInstance(Configuration conf) throws IOException {
+    return newInstance(getDefaultUri(conf), conf);
+  }
+
+  /**
+   * Get a unique local file system object
+   * @param conf the configuration to configure the file system with
+   * @return a LocalFileSystem
+   * This always returns a new FileSystem object.
+   */
+  public static LocalFileSystem newInstanceLocal(Configuration conf)
+    throws IOException {
+    return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
+  }
+
   private static class ClientFinalizer extends Thread {
     public synchronized void run() {
       try {
@@ -1360,8 +1401,21 @@
   static class Cache {
     private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
 
+    /** A variable that makes all objects in the cache unique */
+    private static AtomicLong unique = new AtomicLong(1);
+
     synchronized FileSystem get(URI uri, Configuration conf) throws IOException{
       Key key = new Key(uri, conf);
+      return getInternal(uri, conf, key);
+    }
+
+    /** The objects inserted into the cache using this method are all unique */
+    synchronized FileSystem getUnique(URI uri, Configuration conf) throws IOException{
+      Key key = new Key(uri, conf, unique.getAndIncrement());
+      return getInternal(uri, conf, key);
+    }
+
+    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
       FileSystem fs = map.get(key);
       if (fs == null) {
         fs = createFileSystem(uri, conf);
@@ -1416,10 +1470,16 @@
       final String scheme;
       final String authority;
       final String username;
+      final long unique;   // an artificial way to make a key unique
 
       Key(URI uri, Configuration conf) throws IOException {
+        this(uri, conf, 0);
+      }
+
+      Key(URI uri, Configuration conf, long unique) throws IOException {
         scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
         authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
+        this.unique = unique;
         UserGroupInformation ugi = UserGroupInformation.readFrom(conf);
         if (ugi == null) {
           try {
@@ -1433,7 +1493,7 @@
 
       /** {@inheritDoc} */
       public int hashCode() {
-        return (scheme + authority + username).hashCode();
+        return (scheme + authority + username).hashCode() + (int)unique;
       }
 
       static boolean isEqual(Object a, Object b) {
@@ -1449,7 +1509,8 @@
           Key that = (Key)obj;
           return isEqual(this.scheme, that.scheme)
                  && isEqual(this.authority, that.authority)
-                 && isEqual(this.username, that.username);
+                 && isEqual(this.username, that.username)
+                 && (this.unique == that.unique);
         }
         return false;        
       }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=756152&r1=756151&r2=756152&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java Thu Mar 19 19:38:12 2009
@@ -618,4 +618,20 @@
     assertTrue(map.containsKey(lowercaseCachekey2));    
 
   }
+
+  public static void testFsUniqueness(long megaBytes, int numFiles, long seed)
+    throws Exception {
+
+    // multiple invocations of FileSystem.get return the same object.
+    FileSystem fs1 = FileSystem.get(conf);
+    FileSystem fs2 = FileSystem.get(conf);
+    assertTrue(fs1 == fs2);
+
+    // multiple invocations of FileSystem.newInstance return different objects
+    fs1 = FileSystem.newInstance(conf);
+    fs2 = FileSystem.newInstance(conf);
+    assertTrue(fs1 != fs2 && !fs1.equals(fs2));
+    fs1.close();
+    fs2.close();
+  }
 }