You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cn...@apache.org on 2013/09/28 00:51:14 UTC

svn commit: r1527113 [2/2] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/native/libhdfs/ src/main/native/libhdfs/test/ src/mai...

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h Fri Sep 27 22:51:12 2013
@@ -114,6 +114,47 @@ jthrowable classNameOfObject(jobject job
  * */
 JNIEnv* getJNIEnv(void);
 
+/**
+ * Figure out if a Java object is an instance of a particular class.
+ *
+ * @param env  The Java environment.
+ * @param obj  The object to check.
+ * @param name The class name to check.
+ *
+ * @return     -1 if we failed to find the referenced class name.
+ *             0 if the object is not of the given class.
+ *             1 if the object is of the given class.
+ */
+int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name);
+
+/**
+ * Set a value in a configuration object.
+ *
+ * @param env               The JNI environment
+ * @param jConfiguration    The configuration object to modify
+ * @param key               The key to modify
+ * @param value             The value to set the key to
+ *
+ * @return                  NULL on success; exception otherwise
+ */
+jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
+        const char *key, const char *value);
+
+/**
+ * Fetch an instance of an Enum.
+ *
+ * @param env               The JNI environment.
+ * @param className         The enum class name.
+ * @param valueName         The name of the enum value
+ * @param out               (out param) on success, a local reference to an
+ *                          instance of the enum object.  (Since Java enums are
+ *                          singletones, this is also the only instance.)
+ *
+ * @return                  NULL on success; exception otherwise
+ */
+jthrowable fetchEnumInstance(JNIEnv *env, const char *className,
+                             const char *valueName, jobject *out);
+
 #endif /*LIBHDFS_JNI_HELPER_H*/
 
 /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c Fri Sep 27 22:51:12 2013
@@ -17,14 +17,19 @@
  */
 
 #include "exception.h"
+#include "hdfs.h"
+#include "hdfs_test.h"
 #include "jni_helper.h"
 #include "native_mini_dfs.h"
 
 #include <errno.h>
 #include <jni.h>
+#include <limits.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <sys/types.h>
+#include <unistd.h>
 
 #define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder"
 #define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster"
@@ -39,8 +44,44 @@ struct NativeMiniDfsCluster {
      * The NativeMiniDfsCluster object
      */
     jobject obj;
+
+    /**
+     * Path to the domain socket, or the empty string if there is none.
+     */
+    char domainSocketPath[PATH_MAX];
 };
 
+static jthrowable nmdConfigureShortCircuit(JNIEnv *env,
+              struct NativeMiniDfsCluster *cl, jobject cobj)
+{
+    jthrowable jthr;
+    char *tmpDir;
+
+    int ret = hdfsDisableDomainSocketSecurity();
+    if (ret) {
+        return newRuntimeError(env, "failed to disable hdfs domain "
+                               "socket security: error %d", ret);
+    }
+    jthr = hadoopConfSetStr(env, cobj, "dfs.client.read.shortcircuit", "true");
+    if (jthr) {
+        return jthr;
+    }
+    tmpDir = getenv("TMPDIR");
+    if (!tmpDir) {
+        tmpDir = "/tmp";
+    }
+    snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
+             tmpDir, getpid(), rand());
+    snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
+             tmpDir, getpid(), rand());
+    jthr = hadoopConfSetStr(env, cobj, "dfs.domain.socket.path",
+                            cl->domainSocketPath);
+    if (jthr) {
+        return jthr;
+    }
+    return NULL;
+}
+
 struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
 {
     struct NativeMiniDfsCluster* cl = NULL;
@@ -81,6 +122,28 @@ struct NativeMiniDfsCluster* nmdCreate(s
             goto error;
         }
     }
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdCreate: Configuration::setBoolean");
+        goto error;
+    }
+    // Disable 'minimum block size' -- it's annoying in tests.
+    (*env)->DeleteLocalRef(env, jconfStr);
+    jconfStr = NULL;
+    jthr = newJavaStr(env, "dfs.namenode.fs-limits.min-block-size", &jconfStr);
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdCreate: new String");
+        goto error;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
+                        "setLong", "(Ljava/lang/String;J)V", jconfStr, 0LL);
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                              "nmdCreate: Configuration::setLong");
+        goto error;
+    }
+    // Creae MiniDFSCluster object
     jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER,
                     "(L"HADOOP_CONF";)V", cobj);
     if (jthr) {
@@ -88,6 +151,14 @@ struct NativeMiniDfsCluster* nmdCreate(s
             "nmdCreate: NativeMiniDfsCluster#Builder#Builder");
         goto error;
     }
+    if (conf->configureShortCircuit) {
+        jthr = nmdConfigureShortCircuit(env, cl, cobj);
+        if (jthr) {
+            printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "nmdCreate: nmdConfigureShortCircuit error");
+            goto error;
+        }
+    }
     jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
             "format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat);
     if (jthr) {
@@ -272,3 +343,29 @@ error_dlr_nn:
     
     return ret;
 }
+
+int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
+                            struct hdfsBuilder *bld)
+{
+    int port, ret;
+
+    hdfsBuilderSetNameNode(bld, "localhost");
+    port = nmdGetNameNodePort(cl);
+    if (port < 0) {
+      fprintf(stderr, "nmdGetNameNodePort failed with error %d\n", -port);
+      return EIO;
+    }
+    hdfsBuilderSetNameNodePort(bld, port);
+    if (cl->domainSocketPath[0]) {
+      ret = hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit", "true");
+      if (ret) {
+          return ret;
+      }
+      ret = hdfsBuilderConfSetStr(bld, "dfs.domain.socket.path",
+                            cl->domainSocketPath);
+      if (ret) {
+          return ret;
+      }
+    }
+    return 0;
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h Fri Sep 27 22:51:12 2013
@@ -21,6 +21,7 @@
 
 #include <jni.h> /* for jboolean */
 
+struct hdfsBuilder;
 struct NativeMiniDfsCluster; 
 
 /**
@@ -28,17 +29,24 @@ struct NativeMiniDfsCluster; 
  */
 struct NativeMiniDfsConf {
     /**
-     * Nonzero if the cluster should be formatted prior to startup
+     * Nonzero if the cluster should be formatted prior to startup.
      */
     jboolean doFormat;
+
     /**
      * Whether or not to enable webhdfs in MiniDfsCluster
      */
     jboolean webhdfsEnabled;
+
     /**
      * The http port of the namenode in MiniDfsCluster
      */
     jint namenodeHttpPort;
+
+    /**
+     * Nonzero if we should configure short circuit.
+     */
+    jboolean configureShortCircuit;
 };
 
 /**
@@ -84,7 +92,7 @@ void nmdFree(struct NativeMiniDfsCluster
  *
  * @return          the port, or a negative error code
  */
-int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
+int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); 
 
 /**
  * Get the http address that's in use by the given (non-HA) nativeMiniDfs
@@ -101,4 +109,14 @@ int nmdGetNameNodePort(const struct Nati
 int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
                                int *port, const char **hostName);
 
+/**
+ * Configure the HDFS builder appropriately to connect to this cluster.
+ *
+ * @param bld       The hdfs builder
+ *
+ * @return          the port, or a negative error code
+ */
+int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
+                            struct hdfsBuilder *bld);
+
 #endif

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c Fri Sep 27 22:51:12 2013
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "expect.h"
+#include "hdfs.h"
+#include "native_mini_dfs.h"
+
+#include <errno.h>
+#include <inttypes.h>
+#include <semaphore.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+
+#define TO_STR_HELPER(X) #X
+#define TO_STR(X) TO_STR_HELPER(X)
+
+#define TEST_FILE_NAME_LENGTH 128
+#define TEST_ZEROCOPY_FULL_BLOCK_SIZE 4096
+#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215
+#define TEST_ZEROCOPY_NUM_BLOCKS 6
+#define SMALL_READ_LEN 16
+
+#define ZC_BUF_LEN 32768
+
+static uint8_t *getZeroCopyBlockData(int blockIdx)
+{
+    uint8_t *buf = malloc(TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+    int i;
+    if (!buf) {
+        fprintf(stderr, "malloc(%d) failed\n", TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+        exit(1);
+    }
+    for (i = 0; i < TEST_ZEROCOPY_FULL_BLOCK_SIZE; i++) {
+      buf[i] = blockIdx + (i % 17);
+    }
+    return buf;
+}
+
+static int getZeroCopyBlockLen(int blockIdx)
+{
+    if (blockIdx >= TEST_ZEROCOPY_NUM_BLOCKS) {
+        return 0;
+    } else if (blockIdx == (TEST_ZEROCOPY_NUM_BLOCKS - 1)) {
+        return TEST_ZEROCOPY_LAST_BLOCK_SIZE;
+    } else {
+        return TEST_ZEROCOPY_FULL_BLOCK_SIZE;
+    }
+}
+
+static void printBuf(const uint8_t *buf, size_t len) __attribute__((unused));
+
+static void printBuf(const uint8_t *buf, size_t len)
+{
+  size_t i;
+
+  for (i = 0; i < len; i++) {
+    fprintf(stderr, "%02x", buf[i]);
+  }
+  fprintf(stderr, "\n");
+}
+
+static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
+{
+    hdfsFile file = NULL;
+    struct hadoopRzOptions *opts = NULL;
+    struct hadoopRzBuffer *buffer = NULL;
+    uint8_t *block;
+
+    file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0);
+    EXPECT_NONNULL(file);
+    opts = hadoopRzOptionsAlloc();
+    EXPECT_NONNULL(opts);
+    EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 1));
+    /* haven't read anything yet */
+    EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL));
+    block = getZeroCopyBlockData(0);
+    EXPECT_NONNULL(block);
+    /* first read is half of a block. */
+    buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
+    EXPECT_NONNULL(buffer);
+    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
+          hadoopRzBufferLength(buffer));
+    EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer), block,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+    hadoopRzBufferFree(file, buffer);
+    /* read the next half of the block */
+    buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
+    EXPECT_NONNULL(buffer);
+    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
+          hadoopRzBufferLength(buffer));
+    EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer),
+          block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+    hadoopRzBufferFree(file, buffer);
+    free(block);
+    EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE, 
+              TEST_ZEROCOPY_FULL_BLOCK_SIZE,
+              TEST_ZEROCOPY_FULL_BLOCK_SIZE,
+              TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+    /* Now let's read just a few bytes. */
+    buffer = hadoopReadZero(file, opts, SMALL_READ_LEN);
+    EXPECT_NONNULL(buffer);
+    EXPECT_INT_EQ(SMALL_READ_LEN, hadoopRzBufferLength(buffer));
+    block = getZeroCopyBlockData(1);
+    EXPECT_NONNULL(block);
+    EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer), SMALL_READ_LEN));
+    hadoopRzBufferFree(file, buffer);
+    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+                  hdfsTell(fs, file));
+    EXPECT_ZERO(expectFileStats(file,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
+
+    /* Clear 'skip checksums' and test that we can't do zero-copy reads any
+     * more.  Since there is no ByteBufferPool set, we should fail with
+     * EPROTONOSUPPORT.
+     */
+    EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0));
+    EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+    EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
+
+    /* Now set a ByteBufferPool and try again.  It should succeed this time. */
+    EXPECT_ZERO(hadoopRzOptionsSetByteBufferPool(opts,
+          ELASTIC_BYTE_BUFFER_POOL_CLASS));
+    buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+    EXPECT_NONNULL(buffer);
+    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, hadoopRzBufferLength(buffer));
+    EXPECT_ZERO(expectFileStats(file,
+          (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+          (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+          (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
+    EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, hadoopRzBufferGet(buffer),
+        TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN));
+    free(block);
+    block = getZeroCopyBlockData(2);
+    EXPECT_NONNULL(block);
+    EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer) +
+        (TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
+    hadoopRzBufferFree(file, buffer);
+    free(block);
+    hadoopRzOptionsFree(opts);
+    EXPECT_ZERO(hdfsCloseFile(fs, file));
+    return 0;
+}
+
+static int createZeroCopyTestFile(hdfsFS fs, char *testFileName,
+                                  size_t testFileNameLen)
+{
+    int blockIdx, blockLen;
+    hdfsFile file;
+    uint8_t *data;
+
+    snprintf(testFileName, testFileNameLen, "/zeroCopyTestFile.%d.%d",
+             getpid(), rand());
+    file = hdfsOpenFile(fs, testFileName, O_WRONLY, 0, 1,
+                        TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+    EXPECT_NONNULL(file);
+    for (blockIdx = 0; blockIdx < TEST_ZEROCOPY_NUM_BLOCKS; blockIdx++) {
+        blockLen = getZeroCopyBlockLen(blockIdx);
+        data = getZeroCopyBlockData(blockIdx);
+        EXPECT_NONNULL(data);
+        EXPECT_INT_EQ(blockLen, hdfsWrite(fs, file, data, blockLen));
+    }
+    EXPECT_ZERO(hdfsCloseFile(fs, file));
+    return 0;
+}
+
+/**
+ * Test that we can write a file with libhdfs and then read it back
+ */
+int main(void)
+{
+    int port;
+    struct NativeMiniDfsConf conf = {
+        .doFormat = 1,
+        .configureShortCircuit = 1,
+    };
+    char testFileName[TEST_FILE_NAME_LENGTH];
+    hdfsFS fs;
+    struct NativeMiniDfsCluster* cl;
+    struct hdfsBuilder *bld;
+
+    cl = nmdCreate(&conf);
+    EXPECT_NONNULL(cl);
+    EXPECT_ZERO(nmdWaitClusterUp(cl));
+    port = nmdGetNameNodePort(cl);
+    if (port < 0) {
+        fprintf(stderr, "TEST_ERROR: test_zerocopy: "
+                "nmdGetNameNodePort returned error %d\n", port);
+        return EXIT_FAILURE;
+    }
+    bld = hdfsNewBuilder();
+    EXPECT_NONNULL(bld);
+    EXPECT_ZERO(nmdConfigureHdfsBuilder(cl, bld));
+    hdfsBuilderSetForceNewInstance(bld);
+    hdfsBuilderConfSetStr(bld, "dfs.block.size",
+                          TO_STR(TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+    /* ensure that we'll always get our mmaps */
+    hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit.skip.checksum",
+                          "true");
+    fs = hdfsBuilderConnect(bld);
+    EXPECT_NONNULL(fs);
+    EXPECT_ZERO(createZeroCopyTestFile(fs, testFileName,
+          TEST_FILE_NAME_LENGTH));
+    EXPECT_ZERO(doTestZeroCopyReads(fs, testFileName));
+    EXPECT_ZERO(hdfsDisconnect(fs));
+    EXPECT_ZERO(nmdShutdown(cl));
+    nmdFree(cl);
+    fprintf(stderr, "TEST_SUCCESS\n"); 
+    return EXIT_SUCCESS;
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Sep 27 22:51:12 2013
@@ -1415,4 +1415,32 @@
 	  linearly increases.
 	</description>
 </property>
+
+<property>
+  <name>dfs.client.mmap.cache.size</name>
+  <value>1024</value>
+  <description>
+    When zero-copy reads are used, the DFSClient keeps a cache of recently used
+    memory mapped regions.  This parameter controls the maximum number of
+    entries that we will keep in that cache.
+
+    If this is set to 0, we will not allow mmap.
+
+    The larger this number is, the more file descriptors we will potentially
+    use for memory-mapped files.  mmaped files also use virtual address space.
+    You may need to increase your ulimit virtual address space limits before
+    increasing the client mmap cache size.
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.mmap.cache.timeout.ms</name>
+  <value>900000</value>
+  <description>
+    The minimum length of time that we will keep an mmap entry in the cache
+    between uses.  If an entry is in the cache longer than this, and nobody
+    uses it, it will be removed by a background thread.
+  </description>
+</property>
+
 </configuration>

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,530 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeoutException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Random;
+
+import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+/**
+ * This class tests if EnhancedByteBufferAccess works correctly.
+ */
+public class TestEnhancedByteBufferAccess {
+  private static final Log LOG =
+      LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName());
+
+  static TemporarySocketDirectory sockDir;
+
+  @BeforeClass
+  public static void init() {
+    sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+  }
+
+  private static byte[] byteBufferToArray(ByteBuffer buf) {
+    byte resultArray[] = new byte[buf.remaining()];
+    buf.get(resultArray);
+    buf.flip();
+    return resultArray;
+  }
+  
+  public static HdfsConfiguration initZeroCopyTest() {
+    Assume.assumeTrue(NativeIO.isAvailable());
+    Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
+    conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(sockDir.getDir(),
+          "TestRequestMmapAccess._PORT.sock").getAbsolutePath());
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+    return conf;
+  }
+
+  @Test
+  public void testZeroCopyReads() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    FSDataInputStream fsIn = null;
+    final int TEST_FILE_LENGTH = 12345;
+    
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_FILE_LENGTH, (short)1, 7567L);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      ByteBuffer result = fsIn.read(null, 4096,
+          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      Assert.assertEquals(4096, result.remaining());
+      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+      Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+          byteBufferToArray(result));
+      fsIn.releaseBuffer(result);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+  
+  @Test
+  public void testShortZeroCopyReads() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    FSDataInputStream fsIn = null;
+    final int TEST_FILE_LENGTH = 12345;
+    
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 7567L);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+
+      // Try to read 8192, but only get 4096 because of the block size.
+      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+      ByteBuffer result =
+        dfsIn.read(null, 8192, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      Assert.assertEquals(4096, result.remaining());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+      Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+          byteBufferToArray(result));
+      dfsIn.releaseBuffer(result);
+      
+      // Try to read 4097, but only get 4096 because of the block size.
+      result = 
+          dfsIn.read(null, 4097, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      Assert.assertEquals(4096, result.remaining());
+      Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192),
+          byteBufferToArray(result));
+      dfsIn.releaseBuffer(result);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testZeroCopyReadsNoFallback() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    FSDataInputStream fsIn = null;
+    final int TEST_FILE_LENGTH = 12345;
+    
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_FILE_LENGTH, (short)1, 7567L);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+      ByteBuffer result;
+      try {
+        result = dfsIn.read(null, 4097, EnumSet.noneOf(ReadOption.class));
+        Assert.fail("expected UnsupportedOperationException");
+      } catch (UnsupportedOperationException e) {
+        // expected
+      }
+      result = dfsIn.read(null, 4096, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      Assert.assertEquals(4096, result.remaining());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+      Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+          byteBufferToArray(result));
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
+  private static class CountingVisitor
+      implements ClientMmapManager.ClientMmapVisitor {
+    int count = 0;
+
+    @Override
+    public void accept(ClientMmap mmap) {
+      count++;
+    }
+
+    public void reset() {
+      count = 0;
+    }
+  }
+
+  @Test
+  public void testZeroCopyMmapCache() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    final int TEST_FILE_LENGTH = 16385;
+    final int RANDOM_SEED = 23453;
+    FSDataInputStream fsIn = null;
+    ByteBuffer results[] = { null, null, null, null, null };
+    
+    DistributedFileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      final ClientMmapManager mmapManager = fs.getClient().getMmapManager();
+      final CountingVisitor countingVisitor = new CountingVisitor();
+      mmapManager.visitMmaps(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+      mmapManager.visitEvictable(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+      results[0] = fsIn.read(null, 4096,
+          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      fsIn.seek(0);
+      results[1] = fsIn.read(null, 4096,
+          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      mmapManager.visitMmaps(countingVisitor);
+      Assert.assertEquals(1, countingVisitor.count);
+      countingVisitor.reset();
+      mmapManager.visitEvictable(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+      countingVisitor.reset();
+
+      // The mmaps should be of the first block of the file.
+      final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+      mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() {
+        @Override
+        public void accept(ClientMmap mmap) {
+          Assert.assertEquals(firstBlock, mmap.getBlock());
+        }
+      });
+
+      // Read more blocks.
+      results[2] = fsIn.read(null, 4096,
+          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      results[3] = fsIn.read(null, 4096,
+          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+      try {
+        results[4] = fsIn.read(null, 4096,
+            EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+        Assert.fail("expected UnsupportedOperationException");
+      } catch (UnsupportedOperationException e) {
+        // expected
+      }
+
+      // we should have 3 mmaps, 0 evictable
+      mmapManager.visitMmaps(countingVisitor);
+      Assert.assertEquals(3, countingVisitor.count);
+      countingVisitor.reset();
+      mmapManager.visitEvictable(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+
+      // After we close the cursors, the mmaps should be evictable for 
+      // a brief period of time.  Then, they should be closed (we're 
+      // using a very quick timeout)
+      for (ByteBuffer buffer : results) {
+        if (buffer != null) {
+          fsIn.releaseBuffer(buffer);
+        }
+      }
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        public Boolean get() {
+          countingVisitor.reset();
+          try {
+            mmapManager.visitEvictable(countingVisitor);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+            return false;
+          }
+          return (0 == countingVisitor.count);
+        }
+      }, 10, 10000);
+      countingVisitor.reset();
+      mmapManager.visitMmaps(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test HDFS fallback reads.  HDFS streams support the ByteBufferReadable
+   * interface.
+   */
+  @Test
+  public void testHdfsFallbackReads() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    final int TEST_FILE_LENGTH = 16385;
+    final int RANDOM_SEED = 23453;
+    FSDataInputStream fsIn = null;
+    
+    DistributedFileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      testFallbackImpl(fsIn, original);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
+  private static class RestrictedAllocatingByteBufferPool
+      implements ByteBufferPool {
+    private final boolean direct;
+
+    RestrictedAllocatingByteBufferPool(boolean direct) {
+      this.direct = direct;
+    }
+    @Override
+    public ByteBuffer getBuffer(boolean direct, int length) {
+      Preconditions.checkArgument(this.direct == direct);
+      return direct ? ByteBuffer.allocateDirect(length) :
+        ByteBuffer.allocate(length);
+    }
+    @Override
+    public void putBuffer(ByteBuffer buffer) {
+    }
+  }
+  
+  private static void testFallbackImpl(InputStream stream,
+      byte original[]) throws Exception {
+    RestrictedAllocatingByteBufferPool bufferPool =
+        new RestrictedAllocatingByteBufferPool(
+            stream instanceof ByteBufferReadable);
+
+    ByteBuffer result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
+    Assert.assertEquals(10, result.remaining());
+    Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 10),
+        byteBufferToArray(result));
+
+    result = ByteBufferUtil.fallbackRead(stream, bufferPool, 5000);
+    Assert.assertEquals(5000, result.remaining());
+    Assert.assertArrayEquals(Arrays.copyOfRange(original, 10, 5010),
+        byteBufferToArray(result));
+
+    result = ByteBufferUtil.fallbackRead(stream, bufferPool, 9999999);
+    Assert.assertEquals(11375, result.remaining());
+    Assert.assertArrayEquals(Arrays.copyOfRange(original, 5010, 16385),
+        byteBufferToArray(result));
+
+    result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
+    Assert.assertNull(result);
+  }
+
+  /**
+   * Test the {@link ByteBufferUtil#fallbackRead} function directly.
+   */
+  @Test
+  public void testFallbackRead() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    final int TEST_FILE_LENGTH = 16385;
+    final int RANDOM_SEED = 23453;
+    FSDataInputStream fsIn = null;
+    
+    DistributedFileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      testFallbackImpl(fsIn, original);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test fallback reads on a stream which does not support the
+   * ByteBufferReadable * interface.
+   */
+  @Test
+  public void testIndirectFallbackReads() throws Exception {
+    final File TEST_DIR = new File(
+      System.getProperty("test.build.data","build/test/data"));
+    final String TEST_PATH = TEST_DIR + File.separator +
+        "indirectFallbackTestFile";
+    final int TEST_FILE_LENGTH = 16385;
+    final int RANDOM_SEED = 23453;
+    FileOutputStream fos = null;
+    FileInputStream fis = null;
+    try {
+      fos = new FileOutputStream(TEST_PATH);
+      Random random = new Random(RANDOM_SEED);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      random.nextBytes(original);
+      fos.write(original);
+      fos.close();
+      fos = null;
+      fis = new FileInputStream(TEST_PATH);
+      testFallbackImpl(fis, original);
+    } finally {
+      IOUtils.cleanup(LOG, fos, fis);
+      new File(TEST_PATH).delete();
+    }
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Fri Sep 27 22:51:12 2013
@@ -25,7 +25,6 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -36,11 +35,26 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Assume;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestBlockReaderLocal {
+  private static TemporarySocketDirectory sockDir;
+  
+  @BeforeClass
+  public static void init() {
+    sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+  }
+  
+  @AfterClass
+  public static void shutdown() throws IOException {
+    sockDir.close();
+  }
+  
   public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
       int off2, int len) {
     for (int i = 0; i < len; i++) {
@@ -100,10 +114,11 @@ public class TestBlockReaderLocal {
     FSDataInputStream fsIn = null;
     byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
     
+    FileSystem fs = null;
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive();
-      FileSystem fs = cluster.getFileSystem();
+      fs = cluster.getFileSystem();
       DFSTestUtil.createFile(fs, TEST_PATH,
           BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
       try {
@@ -138,6 +153,7 @@ public class TestBlockReaderLocal {
       test.doTest(blockReaderLocal, original);
     } finally {
       if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
       if (cluster != null) cluster.shutdown();
       if (dataIn != null) dataIn.close();
       if (checkIn != null) checkIn.close();
@@ -382,10 +398,11 @@ public class TestBlockReaderLocal {
     final long RANDOM_SEED = 4567L;
     FSDataInputStream fsIn = null;
     byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+    FileSystem fs = null;
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive();
-      FileSystem fs = cluster.getFileSystem();
+      fs = cluster.getFileSystem();
       DFSTestUtil.createFile(fs, TEST_PATH,
           BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
       try {
@@ -417,6 +434,7 @@ public class TestBlockReaderLocal {
     } finally {
       DFSInputStream.tcpReadsDisabledForTesting = false;
       if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
       if (cluster != null) cluster.shutdown();
       if (sockDir != null) sockDir.close();
     }