You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2016/06/06 18:09:06 UTC

hadoop git commit: HDFS-9890. libhdfs++: Add test suite to simulate network issues. Contributed by Xiaowei Zhu.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 ed43f07ad -> e8f326a51


HDFS-9890. libhdfs++: Add test suite to simulate network issues.  Contributed by Xiaowei Zhu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e8f326a5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e8f326a5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e8f326a5

Branch: refs/heads/HDFS-8707
Commit: e8f326a51db9665c55807e746d07fc26b7e91abe
Parents: ed43f07
Author: James <jh...@apache.org>
Authored: Mon Jun 6 14:07:57 2016 -0400
Committer: James <jh...@apache.org>
Committed: Mon Jun 6 14:07:57 2016 -0400

----------------------------------------------------------------------
 .../hadoop-hdfs-native-client/pom.xml           |   4 +-
 .../main/native/libhdfs-tests/native_mini_dfs.c |  22 +-
 .../main/native/libhdfs-tests/native_mini_dfs.h |  11 +-
 .../libhdfs-tests/test_libhdfs_mini_stress.c    | 338 +++++++++++++++++++
 .../native/libhdfspp/lib/bindings/c/hdfs.cc     |   3 +
 .../main/native/libhdfspp/lib/fs/filehandle.cc  |  11 +-
 .../main/native/libhdfspp/lib/fs/filehandle.h   |   3 +-
 .../main/native/libhdfspp/lib/fs/filesystem.cc  |   2 +-
 .../native/libhdfspp/lib/reader/block_reader.cc |  62 +++-
 .../native/libhdfspp/lib/reader/block_reader.h  |   5 +-
 .../native/libhdfspp/lib/rpc/rpc_connection.cc  |  13 +-
 .../native/libhdfspp/lib/rpc/rpc_connection.h   |  26 +-
 .../main/native/libhdfspp/lib/rpc/rpc_engine.h  |   1 -
 .../main/native/libhdfspp/tests/CMakeLists.txt  |   4 +
 .../native/libhdfspp/tests/bad_datanode_test.cc |   5 +-
 .../src/main/native/libhdfspp/tests/hdfs_shim.c |  48 +--
 .../libhdfspp/tests/libhdfspp_wrapper_defines.h |   1 +
 .../native/libhdfspp/tests/rpc_engine_test.cc   |   2 +-
 18 files changed, 498 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml
index 474e3ac..59773f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/pom.xml
@@ -145,7 +145,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
                     <mkdir dir="${project.build.directory}/native"/>
                     <exec executable="cmake" dir="${project.build.directory}/native"
                           failonerror="true">
-                      <arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}  -DHADOOP_BUILD=1 -DREQUIRE_LIBWEBHDFS=${require.libwebhdfs} -DREQUIRE_FUSE=${require.fuse} -DREQUIRE_VALGRIND=${require.valgrind} -G '${generator}'"/>
+                      <arg line="${basedir}/src/ -DCMAKE_BUILD_TYPE=Debug -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}  -DHADOOP_BUILD=1 -DREQUIRE_LIBWEBHDFS=${require.libwebhdfs} -DREQUIRE_FUSE=${require.fuse} -DREQUIRE_VALGRIND=${require.valgrind} -G '${generator}'"/>
                       <arg line="${native_cmake_args}"/>
                     </exec>
                     <exec executable="msbuild" dir="${project.build.directory}/native"
@@ -210,7 +210,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
                   <target>
                     <mkdir dir="${project.build.directory}"/>
                     <exec executable="cmake" dir="${project.build.directory}" failonerror="true">
-                      <arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}  -DHADOOP_BUILD=1 -DREQUIRE_LIBWEBHDFS=${require.libwebhdfs} -DREQUIRE_FUSE=${require.fuse} -DREQUIRE_VALGRIND=${require.valgrind} "/>
+                      <arg line="${basedir}/src/ -DCMAKE_BUILD_TYPE=Debug -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}  -DHADOOP_BUILD=1 -DREQUIRE_LIBWEBHDFS=${require.libwebhdfs} -DREQUIRE_FUSE=${require.fuse} -DREQUIRE_VALGRIND=${require.valgrind} "/>
                       <arg line="${native_cmake_args}"/>
                     </exec>
                     <exec executable="make" dir="${project.build.directory}" failonerror="true">

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c
index b36ef76..6938109 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c
@@ -182,6 +182,16 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
         }
         (*env)->DeleteLocalRef(env, val.l);
     }
+    if (conf->numDataNodes) {
+        jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
+                "numDataNodes", "(I)L" MINIDFS_CLUSTER_BUILDER ";", conf->numDataNodes);
+        if (jthr) {
+            printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
+                                  "Builder::numDataNodes");
+            goto error;
+        }
+    }
+    (*env)->DeleteLocalRef(env, val.l);
     jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
             "build", "()L" MINIDFS_CLUSTER ";");
     if (jthr) {
@@ -291,7 +301,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
     jthrowable jthr;
     int ret = 0;
     const char *host;
-    
+
     if (!env) {
         fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n");
         return -EIO;
@@ -306,7 +316,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
         return -EIO;
     }
     jNameNode = jVal.l;
-    
+
     // Then get the http address (InetSocketAddress) of the NameNode
     jthr = invokeMethod(env, &jVal, INSTANCE, jNameNode, HADOOP_NAMENODE,
                         "getHttpAddress", "()L" JAVA_INETSOCKETADDRESS ";");
@@ -317,7 +327,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
         goto error_dlr_nn;
     }
     jAddress = jVal.l;
-    
+
     jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
                         JAVA_INETSOCKETADDRESS, "getPort", "()I");
     if (jthr) {
@@ -327,7 +337,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
         goto error_dlr_addr;
     }
     *port = jVal.i;
-    
+
     jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS,
                         "getHostName", "()Ljava/lang/String;");
     if (jthr) {
@@ -339,12 +349,12 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
     host = (*env)->GetStringUTFChars(env, jVal.l, NULL);
     *hostName = strdup(host);
     (*env)->ReleaseStringUTFChars(env, jVal.l, host);
-    
+
 error_dlr_addr:
     (*env)->DeleteLocalRef(env, jAddress);
 error_dlr_nn:
     (*env)->DeleteLocalRef(env, jNameNode);
-    
+
     return ret;
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h
index ce8b1cf..628180f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h
@@ -26,7 +26,7 @@ extern  "C" {
 #endif
 
 struct hdfsBuilder;
-struct NativeMiniDfsCluster; 
+struct NativeMiniDfsCluster;
 
 /**
  * Represents a configuration to use for creating a Native MiniDFSCluster
@@ -51,6 +51,11 @@ struct NativeMiniDfsConf {
      * Nonzero if we should configure short circuit.
      */
     jboolean configureShortCircuit;
+
+    /**
+     * The number of datanodes in MiniDfsCluster
+     */
+    jint numDataNodes;
 };
 
 /**
@@ -96,13 +101,13 @@ void nmdFree(struct NativeMiniDfsCluster* cl);
  *
  * @return          the port, or a negative error code
  */
-int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); 
+int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
 
 /**
  * Get the http address that's in use by the given (non-HA) nativeMiniDfs
  *
  * @param cl        The initialized NativeMiniDfsCluster
- * @param port      Used to capture the http port of the NameNode 
+ * @param port      Used to capture the http port of the NameNode
  *                  of the NativeMiniDfsCluster
  * @param hostName  Used to capture the http hostname of the NameNode
  *                  of the NativeMiniDfsCluster

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c
new file mode 100644
index 0000000..71db8ee
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "expect.h"
+#include "hdfs/hdfs.h"
+#include "hdfspp/hdfs_ext.h"
+#include "native_mini_dfs.h"
+#include "os/thread.h"
+
+#include <errno.h>
+#include <inttypes.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define TO_STR_HELPER(X) #X
+#define TO_STR(X) TO_STR_HELPER(X)
+
+#define TLH_MAX_THREADS 10000
+
+#define TLH_MAX_DNS 16
+
+#define TLH_DEFAULT_BLOCK_SIZE 1048576
+
+#define TLH_DEFAULT_DFS_REPLICATION 3
+
+#define TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES 100
+
+#define TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS 5
+
+#ifndef RANDOM_ERROR_RATIO
+#define RANDOM_ERROR_RATIO 1000000000
+#endif
+
+struct tlhThreadInfo {
+    /** Thread index */
+    int threadIdx;
+    /** 0 = thread was successful; error code otherwise */
+    int success;
+    /** thread identifier */
+    thread theThread;
+    /** fs, shared with other threads **/
+    hdfsFS hdfs;
+    /** Filename */
+    const char *fileNm;
+
+};
+
+static int hdfsNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs,
+                               const char *username)
+{
+    int ret;
+    tPort port;
+    hdfsFS hdfs;
+    struct hdfsBuilder *bld;
+
+    port = (tPort)nmdGetNameNodePort(cl);
+    if (port < 0) {
+        fprintf(stderr, "hdfsNameNodeConnect: nmdGetNameNodePort "
+                "returned error %d\n", port);
+        return port;
+    }
+    bld = hdfsNewBuilder();
+    if (!bld)
+        return -ENOMEM;
+    hdfsBuilderSetForceNewInstance(bld);
+    hdfsBuilderSetNameNode(bld, "localhost");
+    hdfsBuilderSetNameNodePort(bld, port);
+    hdfsBuilderConfSetStr(bld, "dfs.block.size",
+                          TO_STR(TLH_DEFAULT_BLOCK_SIZE));
+    hdfsBuilderConfSetStr(bld, "dfs.blocksize",
+                          TO_STR(TLH_DEFAULT_BLOCK_SIZE));
+    hdfsBuilderConfSetStr(bld, "dfs.replication",
+                          TO_STR(TLH_DEFAULT_DFS_REPLICATION));
+    hdfsBuilderConfSetStr(bld, "ipc.client.connect.max.retries",
+                          TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES));
+    hdfsBuilderConfSetStr(bld, "ipc.client.connect.retry.interval",
+                         TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS));
+    if (username) {
+        hdfsBuilderSetUserName(bld, username);
+    }
+    hdfs = hdfsBuilderConnect(bld);
+    if (!hdfs) {
+        ret = -errno;
+        return ret;
+    }
+    *fs = hdfs;
+    return 0;
+}
+
+static int hdfsWriteData(hdfsFS hdfs, const char *dirNm,
+                         const char *fileNm, tSize fileSz)
+{
+    hdfsFile file;
+    int ret, expected;
+    const char *content;
+
+    content = fileNm;
+
+    if (hdfsExists(hdfs, dirNm) == 0) {
+        EXPECT_ZERO(hdfsDelete(hdfs, dirNm, 1));
+    }
+    EXPECT_ZERO(hdfsCreateDirectory(hdfs, dirNm));
+
+    file = hdfsOpenFile(hdfs, fileNm, O_WRONLY, 0, 0, 0);
+    EXPECT_NONNULL(file);
+
+    expected = (int)strlen(content);
+    tSize sz = 0;
+    while (sz < fileSz) {
+        ret = hdfsWrite(hdfs, file, content, expected);
+        if (ret < 0) {
+            ret = errno;
+            fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret);
+            return ret;
+        }
+        if (ret != expected) {
+            fprintf(stderr, "hdfsWrite was supposed to write %d bytes, but "
+                    "it wrote %d\n", ret, expected);
+            return EIO;
+        }
+        sz += ret;
+    }
+    EXPECT_ZERO(hdfsFlush(hdfs, file));
+    EXPECT_ZERO(hdfsHSync(hdfs, file));
+    EXPECT_ZERO(hdfsCloseFile(hdfs, file));
+    return 0;
+}
+
+static int fileEventCallback(const char * event, const char * cluster, const char * file, int64_t value, int64_t cookie)
+{
+    char * randomErrRatioStr = getenv("RANDOM_ERROR_RATIO");
+    int64_t randomErrRatio = RANDOM_ERROR_RATIO;
+    if (randomErrRatioStr) randomErrRatio = (int64_t)atoi(randomErrRatioStr);
+    if (randomErrRatio == 0) return DEBUG_SIMULATE_ERROR;
+    else if (randomErrRatio < 0) return LIBHDFSPP_EVENT_OK;
+    return random() % randomErrRatio == 0 ? DEBUG_SIMULATE_ERROR : LIBHDFSPP_EVENT_OK;
+}
+
+static int doTestHdfsMiniStress(struct tlhThreadInfo *ti)
+{
+    char tmp[4096];
+    hdfsFile file;
+    int ret, expected;
+    hdfsFileInfo *fileInfo;
+    uint64_t readOps, nErrs=0;
+    tOffset seekPos;
+    const char *content;
+
+    content = ti->fileNm;
+    expected = (int)strlen(content);
+
+    fileInfo = hdfsGetPathInfo(ti->hdfs, ti->fileNm);
+    EXPECT_NONNULL(fileInfo);
+
+    file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0);
+    EXPECT_NONNULL(file);
+
+    libhdfspp_file_event_callback callback = &fileEventCallback;
+
+    hdfsPreAttachFileMonitor(callback, 0);
+
+    fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting read loop\n",
+        ti->threadIdx);
+    for (readOps=0; readOps < 1000; ++readOps) {
+        EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file));
+        file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0);
+        EXPECT_NONNULL(file);
+        seekPos = (((double)random()) / RAND_MAX) * (fileInfo->mSize - expected);
+        seekPos = (seekPos / expected) * expected;
+        ret = hdfsSeek(ti->hdfs, file, seekPos);
+        if (ret < 0) {
+            ret = errno;
+            fprintf(stderr, "hdfsSeek to %"PRIu64" failed and set"
+                    " errno %d\n", seekPos, ret);
+            ++nErrs;
+            continue;
+        }
+        ret = hdfsRead(ti->hdfs, file, tmp, expected);
+        if (ret < 0) {
+            ret = errno;
+            fprintf(stderr, "hdfsRead failed and set errno %d\n", ret);
+            ++nErrs;
+            continue;
+        }
+        if (ret != expected) {
+            fprintf(stderr, "hdfsRead was supposed to read %d bytes, but "
+                    "it read %d\n", ret, expected);
+            ++nErrs;
+            continue;
+        }
+        ret = memcmp(content, tmp, expected);
+        if (ret) {
+            fprintf(stderr, "hdfsRead result (%.*s) does not match expected (%.*s)",
+                    expected, tmp, expected, content);
+            ++nErrs;
+            continue;
+        }
+    }
+    EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file));
+    fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): finished read loop\n",
+        ti->threadIdx);
+    EXPECT_ZERO(nErrs);
+    return 0;
+}
+
+static int testHdfsMiniStressImpl(struct tlhThreadInfo *ti)
+{
+    fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting\n",
+        ti->threadIdx);
+    EXPECT_NONNULL(ti->hdfs);
+    EXPECT_ZERO(doTestHdfsMiniStress(ti));
+    return 0;
+}
+
+static void testHdfsMiniStress(void *v)
+{
+    struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v;
+    int ret = testHdfsMiniStressImpl(ti);
+    ti->success = ret;
+}
+
+static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
+{
+    int i, threadsFailed = 0;
+    const char *sep = "";
+
+    for (i = 0; i < tlhNumThreads; i++) {
+        if (ti[i].success != 0) {
+            threadsFailed = 1;
+        }
+    }
+    if (!threadsFailed) {
+        fprintf(stderr, "testLibHdfsMiniStress: all threads succeeded.  SUCCESS.\n");
+        return EXIT_SUCCESS;
+    }
+    fprintf(stderr, "testLibHdfsMiniStress: some threads failed: [");
+    for (i = 0; i < tlhNumThreads; i++) {
+        if (ti[i].success != 0) {
+            fprintf(stderr, "%s%d", sep, i);
+            sep = ", ";
+        }
+    }
+    fprintf(stderr, "].  FAILURE.\n");
+    return EXIT_FAILURE;
+}
+
+/**
+ * Test intended to stress libhdfs client with concurrent requests. Currently focused
+ * on concurrent reads.
+ */
+int main(void)
+{
+    int i, tlhNumThreads;
+    char *dirNm, *fileNm;
+    tSize fileSz;
+    const char *tlhNumThreadsStr, *tlhNumDNsStr;
+    hdfsFS hdfs = NULL;
+    struct NativeMiniDfsCluster* tlhCluster;
+    struct tlhThreadInfo ti[TLH_MAX_THREADS];
+    struct NativeMiniDfsConf conf = {
+        1, /* doFormat */
+    };
+
+    dirNm = "/tlhMiniStressData";
+    fileNm = "/tlhMiniStressData/file";
+    fileSz = 2*1024*1024;
+
+    tlhNumDNsStr = getenv("TLH_NUM_DNS");
+    if (!tlhNumDNsStr) {
+        tlhNumDNsStr = "1";
+    }
+    conf.numDataNodes = atoi(tlhNumDNsStr);
+    if ((conf.numDataNodes <= 0) || (conf.numDataNodes > TLH_MAX_DNS)) {
+        fprintf(stderr, "testLibHdfsMiniStress: must have a number of datanodes "
+                "between 1 and %d inclusive, not %d\n",
+                TLH_MAX_DNS, conf.numDataNodes);
+        return EXIT_FAILURE;
+    }
+
+    tlhNumThreadsStr = getenv("TLH_NUM_THREADS");
+    if (!tlhNumThreadsStr) {
+        tlhNumThreadsStr = "8";
+    }
+    tlhNumThreads = atoi(tlhNumThreadsStr);
+    if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) {
+        fprintf(stderr, "testLibHdfsMiniStress: must have a number of threads "
+                "between 1 and %d inclusive, not %d\n",
+                TLH_MAX_THREADS, tlhNumThreads);
+        return EXIT_FAILURE;
+    }
+    memset(&ti[0], 0, sizeof(ti));
+    for (i = 0; i < tlhNumThreads; i++) {
+        ti[i].threadIdx = i;
+    }
+
+    tlhCluster = nmdCreate(&conf);
+    EXPECT_NONNULL(tlhCluster);
+    EXPECT_ZERO(nmdWaitClusterUp(tlhCluster));
+
+    EXPECT_ZERO(hdfsNameNodeConnect(tlhCluster, &hdfs, NULL));
+
+    // Single threaded writes for now.
+    EXPECT_ZERO(hdfsWriteData(hdfs, dirNm, fileNm, fileSz));
+
+    // Multi-threaded reads.
+    for (i = 0; i < tlhNumThreads; i++) {
+        ti[i].theThread.start = testHdfsMiniStress;
+        ti[i].theThread.arg = &ti[i];
+        ti[i].hdfs = hdfs;
+        ti[i].fileNm = fileNm;
+        EXPECT_ZERO(threadCreate(&ti[i].theThread));
+    }
+    for (i = 0; i < tlhNumThreads; i++) {
+        EXPECT_ZERO(threadJoin(&ti[i].theThread));
+    }
+
+    EXPECT_ZERO(hdfsDisconnect(hdfs));
+    EXPECT_ZERO(nmdShutdown(tlhCluster));
+    nmdFree(tlhCluster);
+    return checkFailures(ti, tlhNumThreads);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
index 0fe8479..3f19ae3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
@@ -298,6 +298,9 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
       Error(stat);
       return nullptr;
     }
+    if (f && fileEventCallback) {
+      f->SetFileEventCallback(fileEventCallback.value());
+    }
     return new hdfsFile_internal(f);
   } catch (const std::exception & e) {
     ReportException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
index 471281a..9f9311f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
@@ -231,7 +231,7 @@ void FileHandleImpl::AsyncPreadSome(
   // Wrap the DN in a block reader to handle the state and logic of the
   //    block request protocol
   std::shared_ptr<BlockReader> reader;
-  reader = CreateBlockReader(BlockReaderOptions(), dn);
+  reader = CreateBlockReader(BlockReaderOptions(), dn, event_handlers_);
 
   // Lambdas cannot capture copies of member variables so we'll make explicit
   //    copies for it
@@ -240,7 +240,7 @@ void FileHandleImpl::AsyncPreadSome(
   auto cluster_name = cluster_name_;
 
   auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) {
-    auto event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred);
+  event_response event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred);
 #ifndef NDEBUG
     if (event_resp.response() == event_response::kTest_Error) {
       handler(event_resp.status(), dn_id, transferred);
@@ -254,7 +254,7 @@ void FileHandleImpl::AsyncPreadSome(
   auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name]
           (Status status, std::shared_ptr<DataNodeConnection> dn) {
     (void)dn;
-    auto event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0);
+    event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0);
 #ifndef NDEBUG
     if (event_resp.response() == event_response::kTest_Error) {
       status = event_resp.status();
@@ -276,9 +276,10 @@ void FileHandleImpl::AsyncPreadSome(
 }
 
 std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options,
-                                               std::shared_ptr<DataNodeConnection> dn)
+                                                               std::shared_ptr<DataNodeConnection> dn,
+                                                               std::shared_ptr<LibhdfsEvents> event_handlers)
 {
-  std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_);
+  std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_, event_handlers);
 
   LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR
                          << ", ..., dnconn=" << dn.get()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
index a99550a..57cf4b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
@@ -119,7 +119,8 @@ public:
 
 protected:
   virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
-                                                 std::shared_ptr<DataNodeConnection> dn);
+                                                         std::shared_ptr<DataNodeConnection> dn,
+                                                         std::shared_ptr<hdfs::LibhdfsEvents> event_handlers);
   virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
       ::asio::io_service *io_service,
       const ::hadoop::hdfs::DatanodeInfoProto & dn,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index b05bc3d..0d4be41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -96,7 +96,7 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_n
   io_service = nullptr;
 
   /* spawn background threads for asio delegation */
-  unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */;
+  unsigned int threads = 2 /* options.io_threads_, pending HDFS-9117 */;
   for (unsigned int i = 0; i < threads; i++) {
     AddWorkerThread();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
index 5052951..defcc1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
@@ -24,7 +24,6 @@
 
 #include <future>
 
-
 namespace hdfs {
 
 #define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_
@@ -105,7 +104,17 @@ void BlockReaderImpl::AsyncRequestBlock(
   m->Run([this, handler, offset](const Status &status, const State &s) {    Status stat = status;
     if (stat.ok()) {
       const auto &resp = s.response;
-      if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
+
+    if(this->event_handlers_) {
+      event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
+#ifndef NDEBUG
+      if (stat.ok() && event_resp.response() == event_response::kTest_Error) {
+        stat = Status::Error("Test error");
+      }
+#endif
+    }
+
+      if (stat.ok() && resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
         if (resp.has_readopchecksuminfo()) {
           const auto &checksum_info = resp.readopchecksuminfo();
           chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
@@ -162,6 +171,14 @@ struct BlockReaderImpl::ReadPacketHeader
         assert(v && "Failed to parse the header");
         parent_->state_ = kReadChecksum;
       }
+      if(parent_->event_handlers_) {
+        event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
+#ifndef NDEBUG
+        if (status.ok() && event_resp.response() == event_response::kTest_Error) {
+          status = Status::Error("Test error");
+        }
+#endif
+      }
       next(status);
     };
 
@@ -214,7 +231,7 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
       return;
     }
 
-    auto handler = [parent, next](const asio::error_code &ec, size_t) {
+    auto handler = [parent, next, this](const asio::error_code &ec, size_t) {
       Status status;
       if (ec) {
         status = Status(ec.value(), ec.message().c_str());
@@ -222,6 +239,14 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
         parent->state_ =
             parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
       }
+      if(parent->event_handlers_) {
+        event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
+#ifndef NDEBUG
+        if (status.ok() && event_resp.response() == event_response::kTest_Error) {
+          status = Status::Error("Test error");
+        }
+#endif
+      }
       next(status);
     };
     parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
@@ -248,7 +273,6 @@ struct BlockReaderImpl::ReadData : continuation::Continuation {
   virtual void Run(const Next &next) override {
     LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run("
                             << FMT_CONT_AND_PARENT_ADDR << ") called");
-
     auto handler =
         [next, this](const asio::error_code &ec, size_t transferred) {
           Status status;
@@ -261,6 +285,14 @@ struct BlockReaderImpl::ReadData : continuation::Continuation {
           if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
             parent_->state_ = kReadPacketHeader;
           }
+          if(parent_->event_handlers_) {
+            event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
+#ifndef NDEBUG
+            if (status.ok() && event_resp.response() == event_response::kTest_Error) {
+                status = Status::Error("Test error");
+            }
+#endif
+          }
           next(status);
         };
 
@@ -292,13 +324,22 @@ struct BlockReaderImpl::ReadPadding : continuation::Continuation {
       return;
     }
 
-    auto h = [next, this](const Status &status) {
+    auto h = [next, this](const Status &stat) {
+      Status status = stat;
       if (status.ok()) {
         assert(reinterpret_cast<const int &>(*bytes_transferred_) ==
                parent_->chunk_padding_bytes_);
         parent_->chunk_padding_bytes_ = 0;
         parent_->state_ = kReadData;
       }
+      if(parent_->event_handlers_) {
+        event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
+#ifndef NDEBUG
+        if (status.ok() && event_resp.response() == event_response::kTest_Error) {
+          status = Status::Error("Test error");
+        }
+#endif
+      }
       next(status);
     };
     read_data_->Run(h);
@@ -334,11 +375,20 @@ struct BlockReaderImpl::AckRead : continuation::Continuation {
     m->Push(
         continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state()));
 
-    m->Run([this, next](const Status &status,
+    m->Run([this, next](const Status &stat,
                         const hadoop::hdfs::ClientReadStatusProto &) {
+      Status status = stat;
       if (status.ok()) {
         parent_->state_ = BlockReaderImpl::kFinished;
       }
+      if(parent_->event_handlers_) {
+        event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
+#ifndef NDEBUG
+        if (status.ok() && event_resp.response() == event_response::kTest_Error) {
+          status = Status::Error("Test error");
+        }
+#endif
+      }
       next(status);
     });
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
index f9794b1..b5cbdf5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
@@ -93,9 +93,9 @@ class BlockReaderImpl
     : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> {
 public:
   explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn,
-                           CancelHandle cancel_state)
+                           CancelHandle cancel_state, std::shared_ptr<LibhdfsEvents> event_handlers=nullptr)
       : dn_(dn), state_(kOpen), options_(options),
-        chunk_padding_bytes_(0), cancel_state_(cancel_state) {}
+        chunk_padding_bytes_(0), cancel_state_(cancel_state), event_handlers_(event_handlers.get()) {}
 
   virtual void AsyncReadPacket(
     const MutableBuffers &buffers,
@@ -152,6 +152,7 @@ private:
   long long bytes_to_read_;
   std::vector<char> checksum_;
   CancelHandle cancel_state_;
+  LibhdfsEvents* event_handlers_;
 };
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
index 749195a..a72d194 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
@@ -274,9 +274,18 @@ void RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
   }
 
   Status status;
-  if (h.has_exceptionclassname()) {
+  if(event_handlers_) {
+    event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
+#ifndef NDEBUG
+    if (event_resp.response() == event_response::kTest_Error) {
+      status = event_resp.status();
+    }
+#endif
+  }
+
+  if (status.ok() && h.has_exceptionclassname()) {
     status =
-        Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
+      Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
   }
 
   io_service().post([req, response, status]() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
index 255b98b..70a96b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
@@ -30,6 +30,8 @@
 #include <asio/read.hpp>
 #include <asio/write.hpp>
 
+#include <system_error>
+
 namespace hdfs {
 
 template <class NextLayer>
@@ -63,6 +65,10 @@ public:
   NextLayer next_layer_;
 
   void ConnectComplete(const ::asio::error_code &ec);
+
+  // Hide default ctors.
+  RpcConnectionImpl();
+  RpcConnectionImpl(const RpcConnectionImpl &other);
 };
 
 template <class NextLayer>
@@ -70,7 +76,7 @@ RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
     : RpcConnection(engine),
       options_(engine->options()),
       next_layer_(engine->io_service()) {
-    LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called");
+    LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this);
 }
 
 template <class NextLayer>
@@ -84,7 +90,6 @@ RpcConnectionImpl<NextLayer>::~RpcConnectionImpl() {
     LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue");
 }
 
-
 template <class NextLayer>
 void RpcConnectionImpl<NextLayer>::Connect(
     const std::vector<::asio::ip::tcp::endpoint> &server,
@@ -145,7 +150,7 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec)
 
   Status status = ToStatus(ec);
   if(event_handlers_) {
-    auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0);
+    event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0);
 #ifndef NDEBUG
     if (event_resp.response() == event_response::kTest_Error) {
       status = event_resp.status();
@@ -310,27 +315,28 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
 
 
 template <class NextLayer>
-void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &asio_ec,
+void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &original_ec,
                                                    size_t) {
   using std::placeholders::_1;
   using std::placeholders::_2;
   std::lock_guard<std::mutex> state_lock(connection_state_lock_);
 
+  ::asio::error_code my_ec(original_ec);
+
   LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called");
 
   std::shared_ptr<RpcConnection> shared_this = shared_from_this();
 
-  ::asio::error_code ec = asio_ec;
   if(event_handlers_) {
-    auto event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
+    event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
 #ifndef NDEBUG
     if (event_resp.response() == event_response::kTest_Error) {
-        ec = std::make_error_code(std::errc::network_down);
+      my_ec = std::make_error_code(std::errc::network_down);
     }
 #endif
   }
 
-  switch (ec.value()) {
+  switch (my_ec.value()) {
     case 0:
       // No errors
       break;
@@ -338,8 +344,8 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &asi
       // The event loop has been shut down. Ignore the error.
       return;
     default:
-      LOG_WARN(kRPC, << "Network error during RPC read: " << ec.message());
-      CommsError(ToStatus(ec));
+      LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message());
+      CommsError(ToStatus(my_ec));
       return;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
index 066c01f..5f7e618 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
@@ -229,7 +229,6 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
   std::shared_ptr<LibhdfsEvents> event_handlers_;
   std::string cluster_name_;
 
-
   // Lock for mutable parts of this class that need to be thread safe
   std::mutex connection_state_lock_;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
index b30afb9..45bbeb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
@@ -138,6 +138,10 @@ build_libhdfs_test(hdfspp_mini_dfs_smoke hdfspp_test_shim_static ${CMAKE_CURRENT
 link_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static fs reader rpc proto common connection gmock_main ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
 add_libhdfs_test  (hdfspp_mini_dfs_smoke hdfspp_test_shim_static)
 
+build_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static expect.c test_libhdfs_mini_stress.c ${OS_DIR}/thread.c)
+link_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
+add_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static)
+
 build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_ext_test.cc)
 link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY}  ${SASL_LIBRARIES})
 add_libhdfs_test  (hdfs_ext hdfspp_test_shim_static)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
index 01d723f..9e3aeb7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
@@ -93,9 +93,10 @@ public:
   std::shared_ptr<MockReader> mock_reader_ = std::make_shared<MockReader>();
 protected:
   std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
-                                                 std::shared_ptr<DataNodeConnection> dn) override
+                                                 std::shared_ptr<DataNodeConnection> dn,
+                                                 std::shared_ptr<hdfs::LibhdfsEvents> event_handlers) override
   {
-    (void) options; (void) dn;
+      (void) options; (void) dn; (void) event_handlers;
     assert(mock_reader_);
     return mock_reader_;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c
index 0737d08..7613bf3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c
@@ -24,15 +24,9 @@
 #include <stdlib.h>
 #include <string.h>
 
-/* Cheat for now and use the same hdfsBuilder as libhdfs */
-/* (libhdfspp doesn't have an hdfsBuilder yet). */
 struct hdfsBuilder {
-    int forceNewInstance;
-    const char *nn;
-    tPort port;
-    const char *kerbTicketCachePath;
-    const char *userName;
-    struct hdfsBuilderConfOpt *opts;
+  struct libhdfs_hdfsBuilder * libhdfsBuilder;
+  struct libhdfspp_hdfsBuilder * libhdfsppBuilder;
 };
 
 /* Shim structs and functions that delegate to libhdfspp and libhdfs. */
@@ -98,13 +92,13 @@ hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) {
 
 hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
   hdfsFS ret = calloc(1, sizeof(struct hdfs_internal));
-  ret->libhdfsppRep = libhdfspp_hdfsConnect(bld->nn, bld->port);
+  ret->libhdfsppRep = libhdfspp_hdfsBuilderConnect(bld->libhdfsppBuilder);
   if (!ret->libhdfsppRep) {
     free(ret);
     ret = NULL;
   } else {
     /* Destroys bld object. */
-    ret->libhdfsRep = libhdfs_hdfsBuilderConnect(bld);
+    ret->libhdfsRep = libhdfs_hdfsBuilderConnect(bld->libhdfsBuilder);
     if (!ret->libhdfsRep) {
       libhdfspp_hdfsDisconnect(ret->libhdfsppRep);
       free(ret);
@@ -115,49 +109,61 @@ hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
 }
 
 struct hdfsBuilder *hdfsNewBuilder(void) {
-  return libhdfs_hdfsNewBuilder();
+  struct hdfsBuilder * ret = calloc(1, sizeof(struct hdfsBuilder));
+  ret->libhdfsppBuilder = libhdfspp_hdfsNewBuilder();
+  ret->libhdfsBuilder = libhdfs_hdfsNewBuilder();
+  return ret;
 }
 
 void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) {
-  libhdfs_hdfsBuilderSetForceNewInstance(bld);
+  libhdfs_hdfsBuilderSetForceNewInstance(bld->libhdfsBuilder);
+//  libhdfspp_hdfsBuilderSetForceNewInstance(bld->libhdfsppBuilder);
 }
 
 void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) {
-  libhdfs_hdfsBuilderSetNameNode(bld, nn);
+  libhdfs_hdfsBuilderSetNameNode(bld->libhdfsBuilder, nn);
+  libhdfspp_hdfsBuilderSetNameNode(bld->libhdfsppBuilder, nn);
 }
 
 void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port) {
-  libhdfs_hdfsBuilderSetNameNodePort(bld, port);
+  libhdfs_hdfsBuilderSetNameNodePort(bld->libhdfsBuilder, port);
+  libhdfspp_hdfsBuilderSetNameNodePort(bld->libhdfsppBuilder, port);
 }
 
 void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) {
-  libhdfs_hdfsBuilderSetUserName(bld, userName);
+  libhdfs_hdfsBuilderSetUserName(bld->libhdfsBuilder, userName);
+  libhdfspp_hdfsBuilderSetUserName(bld->libhdfsppBuilder, userName);
 }
 
 void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
                                const char *kerbTicketCachePath) {
-  libhdfs_hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath);
+  libhdfs_hdfsBuilderSetKerbTicketCachePath(bld->libhdfsBuilder, kerbTicketCachePath);
+//  libhdfspp_hdfsBuilderSetKerbTicketCachePath(bld->libhdfsppBuilder, kerbTicketCachePath);
 }
 
 void hdfsFreeBuilder(struct hdfsBuilder *bld) {
-  libhdfs_hdfsFreeBuilder(bld);
+  libhdfs_hdfsFreeBuilder(bld->libhdfsBuilder);
+  libhdfspp_hdfsFreeBuilder(bld->libhdfsppBuilder);
+  free(bld);
 }
 
 int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
                           const char *val) {
-  return libhdfs_hdfsBuilderConfSetStr(bld, key, val);
+  fprintf(stderr, "hdfs_shim::hdfsBuilderConfSetStr) key=%s val=%s\n", key, val);
+  libhdfs_hdfsBuilderConfSetStr(bld->libhdfsBuilder, key, val);
+  return libhdfspp_hdfsBuilderConfSetStr(bld->libhdfsppBuilder, key, val);
 }
 
 int hdfsConfGetStr(const char *key, char **val) {
-  return libhdfs_hdfsConfGetStr(key, val);
+  return libhdfspp_hdfsConfGetStr(key, val);
 }
 
 int hdfsConfGetInt(const char *key, int32_t *val) {
-  return libhdfs_hdfsConfGetInt(key, val);
+  return libhdfspp_hdfsConfGetInt(key, val);
 }
 
 void hdfsConfStrFree(char *val) {
-  libhdfs_hdfsConfStrFree(val);
+  libhdfspp_hdfsConfStrFree(val);
 }
 
 int hdfsDisconnect(hdfsFS fs) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h
index 0d50fda..7aa33e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h
@@ -81,6 +81,7 @@
 #define hadoopRzBufferLength libhdfspp_hadoopRzBufferLength
 #define hadoopRzBufferGet libhdfspp_hadoopRzBufferGet
 #define hadoopRzBufferFree libhdfspp_hadoopRzBufferFree
+#define hdfsBuilder libhdfspp_hdfsBuilder
 #define hdfs_internal libhdfspp_hdfs_internal
 #define hdfsFS libhdfspp_hdfsFS
 #define hdfsFile_internal libhdfspp_hdfsFile_internal

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8f326a5/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
index b5f4d9a..defe95d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
@@ -386,7 +386,7 @@ TEST(RpcEngineTest, TestEventCallbacks)
   });
   io_service.run();
   ASSERT_TRUE(complete);
-  ASSERT_EQ(7, callbacks.size());
+  ASSERT_EQ(8, callbacks.size());
   ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
   ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect
   ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org