You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cn...@apache.org on 2013/09/28 00:51:14 UTC
svn commit: r1527113 [2/2] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/client/ src/main/native/libhdfs/
src/main/native/libhdfs/test/ src/mai...
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h Fri Sep 27 22:51:12 2013
@@ -114,6 +114,47 @@ jthrowable classNameOfObject(jobject job
* */
JNIEnv* getJNIEnv(void);
+/**
+ * Figure out if a Java object is an instance of a particular class.
+ *
+ * @param env The Java environment.
+ * @param obj The object to check.
+ * @param name The class name to check.
+ *
+ * @return -1 if we failed to find the referenced class name.
+ * 0 if the object is not of the given class.
+ * 1 if the object is of the given class.
+ */
+int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name);
+
+/**
+ * Set a value in a configuration object.
+ *
+ * @param env The JNI environment
+ * @param jConfiguration The configuration object to modify
+ * @param key The key to modify
+ * @param value The value to set the key to
+ *
+ * @return NULL on success; exception otherwise
+ */
+jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
+ const char *key, const char *value);
+
+/**
+ * Fetch an instance of an Enum.
+ *
+ * @param env The JNI environment.
+ * @param className The enum class name.
+ * @param valueName The name of the enum value
+ * @param out (out param) on success, a local reference to an
+ * instance of the enum object. (Since Java enums are
+ * singletones, this is also the only instance.)
+ *
+ * @return NULL on success; exception otherwise
+ */
+jthrowable fetchEnumInstance(JNIEnv *env, const char *className,
+ const char *valueName, jobject *out);
+
#endif /*LIBHDFS_JNI_HELPER_H*/
/**
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c Fri Sep 27 22:51:12 2013
@@ -17,14 +17,19 @@
*/
#include "exception.h"
+#include "hdfs.h"
+#include "hdfs_test.h"
#include "jni_helper.h"
#include "native_mini_dfs.h"
#include <errno.h>
#include <jni.h>
+#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <sys/types.h>
+#include <unistd.h>
#define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder"
#define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster"
@@ -39,8 +44,44 @@ struct NativeMiniDfsCluster {
* The NativeMiniDfsCluster object
*/
jobject obj;
+
+ /**
+ * Path to the domain socket, or the empty string if there is none.
+ */
+ char domainSocketPath[PATH_MAX];
};
+static jthrowable nmdConfigureShortCircuit(JNIEnv *env,
+ struct NativeMiniDfsCluster *cl, jobject cobj)
+{
+ jthrowable jthr;
+ char *tmpDir;
+
+ int ret = hdfsDisableDomainSocketSecurity();
+ if (ret) {
+ return newRuntimeError(env, "failed to disable hdfs domain "
+ "socket security: error %d", ret);
+ }
+ jthr = hadoopConfSetStr(env, cobj, "dfs.client.read.shortcircuit", "true");
+ if (jthr) {
+ return jthr;
+ }
+ tmpDir = getenv("TMPDIR");
+ if (!tmpDir) {
+ tmpDir = "/tmp";
+ }
+ snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
+ tmpDir, getpid(), rand());
+ snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
+ tmpDir, getpid(), rand());
+ jthr = hadoopConfSetStr(env, cobj, "dfs.domain.socket.path",
+ cl->domainSocketPath);
+ if (jthr) {
+ return jthr;
+ }
+ return NULL;
+}
+
struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
{
struct NativeMiniDfsCluster* cl = NULL;
@@ -81,6 +122,28 @@ struct NativeMiniDfsCluster* nmdCreate(s
goto error;
}
}
+ if (jthr) {
+ printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "nmdCreate: Configuration::setBoolean");
+ goto error;
+ }
+ // Disable 'minimum block size' -- it's annoying in tests.
+ (*env)->DeleteLocalRef(env, jconfStr);
+ jconfStr = NULL;
+ jthr = newJavaStr(env, "dfs.namenode.fs-limits.min-block-size", &jconfStr);
+ if (jthr) {
+ printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "nmdCreate: new String");
+ goto error;
+ }
+ jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
+ "setLong", "(Ljava/lang/String;J)V", jconfStr, 0LL);
+ if (jthr) {
+ printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "nmdCreate: Configuration::setLong");
+ goto error;
+ }
+ // Creae MiniDFSCluster object
jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER,
"(L"HADOOP_CONF";)V", cobj);
if (jthr) {
@@ -88,6 +151,14 @@ struct NativeMiniDfsCluster* nmdCreate(s
"nmdCreate: NativeMiniDfsCluster#Builder#Builder");
goto error;
}
+ if (conf->configureShortCircuit) {
+ jthr = nmdConfigureShortCircuit(env, cl, cobj);
+ if (jthr) {
+ printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "nmdCreate: nmdConfigureShortCircuit error");
+ goto error;
+ }
+ }
jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
"format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat);
if (jthr) {
@@ -272,3 +343,29 @@ error_dlr_nn:
return ret;
}
+
+int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
+ struct hdfsBuilder *bld)
+{
+ int port, ret;
+
+ hdfsBuilderSetNameNode(bld, "localhost");
+ port = nmdGetNameNodePort(cl);
+ if (port < 0) {
+ fprintf(stderr, "nmdGetNameNodePort failed with error %d\n", -port);
+ return EIO;
+ }
+ hdfsBuilderSetNameNodePort(bld, port);
+ if (cl->domainSocketPath[0]) {
+ ret = hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit", "true");
+ if (ret) {
+ return ret;
+ }
+ ret = hdfsBuilderConfSetStr(bld, "dfs.domain.socket.path",
+ cl->domainSocketPath);
+ if (ret) {
+ return ret;
+ }
+ }
+ return 0;
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h Fri Sep 27 22:51:12 2013
@@ -21,6 +21,7 @@
#include <jni.h> /* for jboolean */
+struct hdfsBuilder;
struct NativeMiniDfsCluster;
/**
@@ -28,17 +29,24 @@ struct NativeMiniDfsCluster;
*/
struct NativeMiniDfsConf {
/**
- * Nonzero if the cluster should be formatted prior to startup
+ * Nonzero if the cluster should be formatted prior to startup.
*/
jboolean doFormat;
+
/**
* Whether or not to enable webhdfs in MiniDfsCluster
*/
jboolean webhdfsEnabled;
+
/**
* The http port of the namenode in MiniDfsCluster
*/
jint namenodeHttpPort;
+
+ /**
+ * Nonzero if we should configure short circuit.
+ */
+ jboolean configureShortCircuit;
};
/**
@@ -84,7 +92,7 @@ void nmdFree(struct NativeMiniDfsCluster
*
* @return the port, or a negative error code
*/
-int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
+int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
/**
* Get the http address that's in use by the given (non-HA) nativeMiniDfs
@@ -101,4 +109,14 @@ int nmdGetNameNodePort(const struct Nati
int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
int *port, const char **hostName);
+/**
+ * Configure the HDFS builder appropriately to connect to this cluster.
+ *
+ * @param bld The hdfs builder
+ *
+ * @return the port, or a negative error code
+ */
+int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
+ struct hdfsBuilder *bld);
+
#endif
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c Fri Sep 27 22:51:12 2013
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "expect.h"
+#include "hdfs.h"
+#include "native_mini_dfs.h"
+
+#include <errno.h>
+#include <inttypes.h>
+#include <semaphore.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+
+#define TO_STR_HELPER(X) #X
+#define TO_STR(X) TO_STR_HELPER(X)
+
+#define TEST_FILE_NAME_LENGTH 128
+#define TEST_ZEROCOPY_FULL_BLOCK_SIZE 4096
+#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215
+#define TEST_ZEROCOPY_NUM_BLOCKS 6
+#define SMALL_READ_LEN 16
+
+#define ZC_BUF_LEN 32768
+
+static uint8_t *getZeroCopyBlockData(int blockIdx)
+{
+ uint8_t *buf = malloc(TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+ int i;
+ if (!buf) {
+ fprintf(stderr, "malloc(%d) failed\n", TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+ exit(1);
+ }
+ for (i = 0; i < TEST_ZEROCOPY_FULL_BLOCK_SIZE; i++) {
+ buf[i] = blockIdx + (i % 17);
+ }
+ return buf;
+}
+
+static int getZeroCopyBlockLen(int blockIdx)
+{
+ if (blockIdx >= TEST_ZEROCOPY_NUM_BLOCKS) {
+ return 0;
+ } else if (blockIdx == (TEST_ZEROCOPY_NUM_BLOCKS - 1)) {
+ return TEST_ZEROCOPY_LAST_BLOCK_SIZE;
+ } else {
+ return TEST_ZEROCOPY_FULL_BLOCK_SIZE;
+ }
+}
+
+static void printBuf(const uint8_t *buf, size_t len) __attribute__((unused));
+
+static void printBuf(const uint8_t *buf, size_t len)
+{
+ size_t i;
+
+ for (i = 0; i < len; i++) {
+ fprintf(stderr, "%02x", buf[i]);
+ }
+ fprintf(stderr, "\n");
+}
+
+static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
+{
+ hdfsFile file = NULL;
+ struct hadoopRzOptions *opts = NULL;
+ struct hadoopRzBuffer *buffer = NULL;
+ uint8_t *block;
+
+ file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0);
+ EXPECT_NONNULL(file);
+ opts = hadoopRzOptionsAlloc();
+ EXPECT_NONNULL(opts);
+ EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 1));
+ /* haven't read anything yet */
+ EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL));
+ block = getZeroCopyBlockData(0);
+ EXPECT_NONNULL(block);
+ /* first read is half of a block. */
+ buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
+ EXPECT_NONNULL(buffer);
+ EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
+ hadoopRzBufferLength(buffer));
+ EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer), block,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+ hadoopRzBufferFree(file, buffer);
+ /* read the next half of the block */
+ buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
+ EXPECT_NONNULL(buffer);
+ EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
+ hadoopRzBufferLength(buffer));
+ EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer),
+ block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+ hadoopRzBufferFree(file, buffer);
+ free(block);
+ EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+ /* Now let's read just a few bytes. */
+ buffer = hadoopReadZero(file, opts, SMALL_READ_LEN);
+ EXPECT_NONNULL(buffer);
+ EXPECT_INT_EQ(SMALL_READ_LEN, hadoopRzBufferLength(buffer));
+ block = getZeroCopyBlockData(1);
+ EXPECT_NONNULL(block);
+ EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer), SMALL_READ_LEN));
+ hadoopRzBufferFree(file, buffer);
+ EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+ hdfsTell(fs, file));
+ EXPECT_ZERO(expectFileStats(file,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
+
+ /* Clear 'skip checksums' and test that we can't do zero-copy reads any
+ * more. Since there is no ByteBufferPool set, we should fail with
+ * EPROTONOSUPPORT.
+ */
+ EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0));
+ EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+ EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
+
+ /* Now set a ByteBufferPool and try again. It should succeed this time. */
+ EXPECT_ZERO(hadoopRzOptionsSetByteBufferPool(opts,
+ ELASTIC_BYTE_BUFFER_POOL_CLASS));
+ buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+ EXPECT_NONNULL(buffer);
+ EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, hadoopRzBufferLength(buffer));
+ EXPECT_ZERO(expectFileStats(file,
+ (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+ (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+ (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
+ EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, hadoopRzBufferGet(buffer),
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN));
+ free(block);
+ block = getZeroCopyBlockData(2);
+ EXPECT_NONNULL(block);
+ EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer) +
+ (TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
+ hadoopRzBufferFree(file, buffer);
+ free(block);
+ hadoopRzOptionsFree(opts);
+ EXPECT_ZERO(hdfsCloseFile(fs, file));
+ return 0;
+}
+
+static int createZeroCopyTestFile(hdfsFS fs, char *testFileName,
+ size_t testFileNameLen)
+{
+ int blockIdx, blockLen;
+ hdfsFile file;
+ uint8_t *data;
+
+ snprintf(testFileName, testFileNameLen, "/zeroCopyTestFile.%d.%d",
+ getpid(), rand());
+ file = hdfsOpenFile(fs, testFileName, O_WRONLY, 0, 1,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+ EXPECT_NONNULL(file);
+ for (blockIdx = 0; blockIdx < TEST_ZEROCOPY_NUM_BLOCKS; blockIdx++) {
+ blockLen = getZeroCopyBlockLen(blockIdx);
+ data = getZeroCopyBlockData(blockIdx);
+ EXPECT_NONNULL(data);
+ EXPECT_INT_EQ(blockLen, hdfsWrite(fs, file, data, blockLen));
+ }
+ EXPECT_ZERO(hdfsCloseFile(fs, file));
+ return 0;
+}
+
+/**
+ * Test that we can write a file with libhdfs and then read it back
+ */
+int main(void)
+{
+ int port;
+ struct NativeMiniDfsConf conf = {
+ .doFormat = 1,
+ .configureShortCircuit = 1,
+ };
+ char testFileName[TEST_FILE_NAME_LENGTH];
+ hdfsFS fs;
+ struct NativeMiniDfsCluster* cl;
+ struct hdfsBuilder *bld;
+
+ cl = nmdCreate(&conf);
+ EXPECT_NONNULL(cl);
+ EXPECT_ZERO(nmdWaitClusterUp(cl));
+ port = nmdGetNameNodePort(cl);
+ if (port < 0) {
+ fprintf(stderr, "TEST_ERROR: test_zerocopy: "
+ "nmdGetNameNodePort returned error %d\n", port);
+ return EXIT_FAILURE;
+ }
+ bld = hdfsNewBuilder();
+ EXPECT_NONNULL(bld);
+ EXPECT_ZERO(nmdConfigureHdfsBuilder(cl, bld));
+ hdfsBuilderSetForceNewInstance(bld);
+ hdfsBuilderConfSetStr(bld, "dfs.block.size",
+ TO_STR(TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+ /* ensure that we'll always get our mmaps */
+ hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit.skip.checksum",
+ "true");
+ fs = hdfsBuilderConnect(bld);
+ EXPECT_NONNULL(fs);
+ EXPECT_ZERO(createZeroCopyTestFile(fs, testFileName,
+ TEST_FILE_NAME_LENGTH));
+ EXPECT_ZERO(doTestZeroCopyReads(fs, testFileName));
+ EXPECT_ZERO(hdfsDisconnect(fs));
+ EXPECT_ZERO(nmdShutdown(cl));
+ nmdFree(cl);
+ fprintf(stderr, "TEST_SUCCESS\n");
+ return EXIT_SUCCESS;
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Sep 27 22:51:12 2013
@@ -1415,4 +1415,32 @@
linearly increases.
</description>
</property>
+
+<property>
+ <name>dfs.client.mmap.cache.size</name>
+ <value>1024</value>
+ <description>
+ When zero-copy reads are used, the DFSClient keeps a cache of recently used
+ memory mapped regions. This parameter controls the maximum number of
+ entries that we will keep in that cache.
+
+ If this is set to 0, we will not allow mmap.
+
+ The larger this number is, the more file descriptors we will potentially
+ use for memory-mapped files. mmaped files also use virtual address space.
+ You may need to increase your ulimit virtual address space limits before
+ increasing the client mmap cache size.
+ </description>
+</property>
+
+<property>
+ <name>dfs.client.mmap.cache.timeout.ms</name>
+ <value>900000</value>
+ <description>
+ The minimum length of time that we will keep an mmap entry in the cache
+ between uses. If an entry is in the cache longer than this, and nobody
+ uses it, it will be removed by a background thread.
+ </description>
+</property>
+
</configuration>
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,530 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeoutException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Random;
+
+import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+/**
+ * This class tests if EnhancedByteBufferAccess works correctly.
+ */
+public class TestEnhancedByteBufferAccess {
+ private static final Log LOG =
+ LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName());
+
+ static TemporarySocketDirectory sockDir;
+
+ @BeforeClass
+ public static void init() {
+ sockDir = new TemporarySocketDirectory();
+ DomainSocket.disableBindPathValidation();
+ }
+
+ private static byte[] byteBufferToArray(ByteBuffer buf) {
+ byte resultArray[] = new byte[buf.remaining()];
+ buf.get(resultArray);
+ buf.flip();
+ return resultArray;
+ }
+
+ public static HdfsConfiguration initZeroCopyTest() {
+ Assume.assumeTrue(NativeIO.isAvailable());
+ Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
+ conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(),
+ "TestRequestMmapAccess._PORT.sock").getAbsolutePath());
+ conf.setBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+ return conf;
+ }
+
+ @Test
+ public void testZeroCopyReads() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ FSDataInputStream fsIn = null;
+ final int TEST_FILE_LENGTH = 12345;
+
+ FileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_FILE_LENGTH, (short)1, 7567L);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ ByteBuffer result = fsIn.read(null, 4096,
+ EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ Assert.assertEquals(4096, result.remaining());
+ HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalBytesRead());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+ byteBufferToArray(result));
+ fsIn.releaseBuffer(result);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testShortZeroCopyReads() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ FSDataInputStream fsIn = null;
+ final int TEST_FILE_LENGTH = 12345;
+
+ FileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 7567L);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+
+ // Try to read 8192, but only get 4096 because of the block size.
+ HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+ ByteBuffer result =
+ dfsIn.read(null, 8192, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ Assert.assertEquals(4096, result.remaining());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalBytesRead());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+ byteBufferToArray(result));
+ dfsIn.releaseBuffer(result);
+
+ // Try to read 4097, but only get 4096 because of the block size.
+ result =
+ dfsIn.read(null, 4097, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ Assert.assertEquals(4096, result.remaining());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192),
+ byteBufferToArray(result));
+ dfsIn.releaseBuffer(result);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testZeroCopyReadsNoFallback() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ FSDataInputStream fsIn = null;
+ final int TEST_FILE_LENGTH = 12345;
+
+ FileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_FILE_LENGTH, (short)1, 7567L);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+ ByteBuffer result;
+ try {
+ result = dfsIn.read(null, 4097, EnumSet.noneOf(ReadOption.class));
+ Assert.fail("expected UnsupportedOperationException");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+ result = dfsIn.read(null, 4096, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ Assert.assertEquals(4096, result.remaining());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalBytesRead());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+ byteBufferToArray(result));
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ private static class CountingVisitor
+ implements ClientMmapManager.ClientMmapVisitor {
+ int count = 0;
+
+ @Override
+ public void accept(ClientMmap mmap) {
+ count++;
+ }
+
+ public void reset() {
+ count = 0;
+ }
+ }
+
+ @Test
+ public void testZeroCopyMmapCache() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ final int TEST_FILE_LENGTH = 16385;
+ final int RANDOM_SEED = 23453;
+ FSDataInputStream fsIn = null;
+ ByteBuffer results[] = { null, null, null, null, null };
+
+ DistributedFileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ final ClientMmapManager mmapManager = fs.getClient().getMmapManager();
+ final CountingVisitor countingVisitor = new CountingVisitor();
+ mmapManager.visitMmaps(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+ mmapManager.visitEvictable(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+ results[0] = fsIn.read(null, 4096,
+ EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ fsIn.seek(0);
+ results[1] = fsIn.read(null, 4096,
+ EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ mmapManager.visitMmaps(countingVisitor);
+ Assert.assertEquals(1, countingVisitor.count);
+ countingVisitor.reset();
+ mmapManager.visitEvictable(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+ countingVisitor.reset();
+
+ // The mmaps should be of the first block of the file.
+ final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+ mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() {
+ @Override
+ public void accept(ClientMmap mmap) {
+ Assert.assertEquals(firstBlock, mmap.getBlock());
+ }
+ });
+
+ // Read more blocks.
+ results[2] = fsIn.read(null, 4096,
+ EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ results[3] = fsIn.read(null, 4096,
+ EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ try {
+ results[4] = fsIn.read(null, 4096,
+ EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ Assert.fail("expected UnsupportedOperationException");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ // we should have 3 mmaps, 0 evictable
+ mmapManager.visitMmaps(countingVisitor);
+ Assert.assertEquals(3, countingVisitor.count);
+ countingVisitor.reset();
+ mmapManager.visitEvictable(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+
+ // After we close the cursors, the mmaps should be evictable for
+ // a brief period of time. Then, they should be closed (we're
+ // using a very quick timeout)
+ for (ByteBuffer buffer : results) {
+ if (buffer != null) {
+ fsIn.releaseBuffer(buffer);
+ }
+ }
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ public Boolean get() {
+ countingVisitor.reset();
+ try {
+ mmapManager.visitEvictable(countingVisitor);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return false;
+ }
+ return (0 == countingVisitor.count);
+ }
+ }, 10, 10000);
+ countingVisitor.reset();
+ mmapManager.visitMmaps(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test HDFS fallback reads. HDFS streams support the ByteBufferReadable
+ * interface.
+ */
+ @Test
+ public void testHdfsFallbackReads() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ final int TEST_FILE_LENGTH = 16385;
+ final int RANDOM_SEED = 23453;
+ FSDataInputStream fsIn = null;
+
+ DistributedFileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ testFallbackImpl(fsIn, original);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ private static class RestrictedAllocatingByteBufferPool
+ implements ByteBufferPool {
+ private final boolean direct;
+
+ RestrictedAllocatingByteBufferPool(boolean direct) {
+ this.direct = direct;
+ }
+ @Override
+ public ByteBuffer getBuffer(boolean direct, int length) {
+ Preconditions.checkArgument(this.direct == direct);
+ return direct ? ByteBuffer.allocateDirect(length) :
+ ByteBuffer.allocate(length);
+ }
+ @Override
+ public void putBuffer(ByteBuffer buffer) {
+ }
+ }
+
+ private static void testFallbackImpl(InputStream stream,
+ byte original[]) throws Exception {
+ RestrictedAllocatingByteBufferPool bufferPool =
+ new RestrictedAllocatingByteBufferPool(
+ stream instanceof ByteBufferReadable);
+
+ ByteBuffer result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
+ Assert.assertEquals(10, result.remaining());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 10),
+ byteBufferToArray(result));
+
+ result = ByteBufferUtil.fallbackRead(stream, bufferPool, 5000);
+ Assert.assertEquals(5000, result.remaining());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 10, 5010),
+ byteBufferToArray(result));
+
+ result = ByteBufferUtil.fallbackRead(stream, bufferPool, 9999999);
+ Assert.assertEquals(11375, result.remaining());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 5010, 16385),
+ byteBufferToArray(result));
+
+ result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
+ Assert.assertNull(result);
+ }
+
+ /**
+ * Test the {@link ByteBufferUtil#fallbackRead} function directly.
+ */
+ @Test
+ public void testFallbackRead() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ final int TEST_FILE_LENGTH = 16385;
+ final int RANDOM_SEED = 23453;
+ FSDataInputStream fsIn = null;
+
+ DistributedFileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ testFallbackImpl(fsIn, original);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test fallback reads on a stream which does not support the
+ * ByteBufferReadable * interface.
+ */
+ @Test
+ public void testIndirectFallbackReads() throws Exception {
+ final File TEST_DIR = new File(
+ System.getProperty("test.build.data","build/test/data"));
+ final String TEST_PATH = TEST_DIR + File.separator +
+ "indirectFallbackTestFile";
+ final int TEST_FILE_LENGTH = 16385;
+ final int RANDOM_SEED = 23453;
+ FileOutputStream fos = null;
+ FileInputStream fis = null;
+ try {
+ fos = new FileOutputStream(TEST_PATH);
+ Random random = new Random(RANDOM_SEED);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ random.nextBytes(original);
+ fos.write(original);
+ fos.close();
+ fos = null;
+ fis = new FileInputStream(TEST_PATH);
+ testFallbackImpl(fis, original);
+ } finally {
+ IOUtils.cleanup(LOG, fos, fis);
+ new File(TEST_PATH).delete();
+ }
+ }
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Fri Sep 27 22:51:12 2013
@@ -25,7 +25,6 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -36,11 +35,26 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestBlockReaderLocal {
+ private static TemporarySocketDirectory sockDir;
+
+ @BeforeClass
+ public static void init() {
+ sockDir = new TemporarySocketDirectory();
+ DomainSocket.disableBindPathValidation();
+ }
+
+ @AfterClass
+ public static void shutdown() throws IOException {
+ sockDir.close();
+ }
+
public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
int off2, int len) {
for (int i = 0; i < len; i++) {
@@ -100,10 +114,11 @@ public class TestBlockReaderLocal {
FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+ FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
- FileSystem fs = cluster.getFileSystem();
+ fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
try {
@@ -138,6 +153,7 @@ public class TestBlockReaderLocal {
test.doTest(blockReaderLocal, original);
} finally {
if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
if (dataIn != null) dataIn.close();
if (checkIn != null) checkIn.close();
@@ -382,10 +398,11 @@ public class TestBlockReaderLocal {
final long RANDOM_SEED = 4567L;
FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+ FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
- FileSystem fs = cluster.getFileSystem();
+ fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
try {
@@ -417,6 +434,7 @@ public class TestBlockReaderLocal {
} finally {
DFSInputStream.tcpReadsDisabledForTesting = false;
if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
if (sockDir != null) sockDir.close();
}