You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/10/25 01:21:27 UTC

[3/4] HDFS-7014. Implement input streams and file system functionality (zhwangzw via cmccabe)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.cc
new file mode 100644
index 0000000..3168fe4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.cc
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FileSystemImpl.h"
+#include "InputStream.h"
+#include "InputStreamImpl.h"
+#include "StatusInternal.h"
+
+using namespace hdfs::internal;
+
+namespace hdfs {
+
+InputStream::InputStream() {
+    impl = new internal::InputStreamImpl;
+}
+
+InputStream::~InputStream() {
+    delete impl;
+}
+
+Status InputStream::open(FileSystem &fs, const std::string &path,
+                         bool verifyChecksum) {
+    if (!fs.impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    try {
+        impl->open(fs.impl, path.c_str(), verifyChecksum);
+    } catch (...) {
+        return lastError = CreateStatusFromException(current_exception());
+    }
+
+    return lastError = Status::OK();
+}
+
+int32_t InputStream::read(char *buf, int32_t size) {
+    int32_t retval = -1;
+
+    try {
+        retval = impl->read(buf, size);
+        lastError = Status::OK();
+    } catch (...) {
+        lastError = CreateStatusFromException(current_exception());
+    }
+
+    return retval;
+}
+
+Status InputStream::readFully(char *buf, int64_t size) {
+    try {
+        impl->readFully(buf, size);
+    } catch (...) {
+        return lastError = CreateStatusFromException(current_exception());
+    }
+
+    return lastError = Status::OK();
+}
+
+int64_t InputStream::available() {
+    int64_t retval = -1;
+
+    try {
+        retval = impl->available();
+        lastError = Status::OK();
+    } catch (...) {
+        lastError = CreateStatusFromException(current_exception());
+    }
+
+    return retval;
+}
+
+Status InputStream::seek(int64_t pos) {
+    try {
+        impl->seek(pos);
+    } catch (...) {
+        return lastError = CreateStatusFromException(current_exception());
+    }
+
+    return lastError = Status::OK();
+}
+
+int64_t InputStream::tell() {
+    int64_t retval = -1;
+
+    try {
+        retval = impl->tell();
+        lastError = Status::OK();
+    } catch (...) {
+        lastError = CreateStatusFromException(current_exception());
+    }
+
+    return retval;
+}
+
+Status InputStream::close() {
+    try {
+        impl->close();
+    } catch (...) {
+        return lastError = CreateStatusFromException(current_exception());
+    }
+
+    return lastError = Status::OK();
+}
+
+Status InputStream::getLastError() {
+    return lastError;
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h
index ddd9434..bd7a5dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h
@@ -19,20 +19,28 @@
 #ifndef _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_
 #define _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_
 
-#include "FileSystem.h"
+#include "Status.h"
 
 namespace hdfs {
 namespace internal {
-class InputStreamInter;
+class InputStreamImpl;
 }
 
+class FileSystem;
+
 /**
  * A input stream used read data from hdfs.
  */
 class InputStream {
 public:
+    /**
+     * Construct an instance.
+     */
     InputStream();
 
+    /**
+     * Destroy this instance.
+     */
     ~InputStream();
 
     /**
@@ -40,51 +48,66 @@ public:
      * @param fs hdfs file system.
      * @param path the file to be read.
      * @param verifyChecksum verify the checksum.
+     * @return the result status of this operation
      */
-    void open(FileSystem & fs, const char * path, bool verifyChecksum = true);
+    Status open(FileSystem &fs, const std::string &path,
+                bool verifyChecksum = true);
 
     /**
      * To read data from hdfs.
      * @param buf the buffer used to filled.
      * @param size buffer size.
-     * @return return the number of bytes filled in the buffer, it may less than size.
+     * @return return the number of bytes filled in the buffer, it may less than
+     * size, -1 on error.
      */
-    int32_t read(char * buf, int32_t size);
+    int32_t read(char *buf, int32_t size);
 
     /**
      * To read data from hdfs, block until get the given size of bytes.
      * @param buf the buffer used to filled.
      * @param size the number of bytes to be read.
+     * @return the result status of this operation
      */
-    void readFully(char * buf, int64_t size);
+    Status readFully(char *buf, int64_t size);
 
     /**
      * Get how many bytes can be read without blocking.
-     * @return The number of bytes can be read without blocking.
+     * @return The number of bytes can be read without blocking, -1 on error.
      */
     int64_t available();
 
     /**
      * To move the file point to the given position.
      * @param pos the given position.
+     * @return the result status of this operation
      */
-    void seek(int64_t pos);
+    Status seek(int64_t pos);
 
     /**
      * To get the current file point position.
-     * @return the position of current file point.
+     * @return the position of current file pointer, -1 on error.
      */
     int64_t tell();
 
     /**
      * Close the stream.
+     * @return the result status of this operation
      */
-    void close();
+    Status close();
+
+    /**
+     * Get the error status of the last operation.
+     * @return the error status of the last operation.
+     */
+    Status getLastError();
 
 private:
-    Internal::InputStreamInter * impl;
-};
+    InputStream(const InputStream &other);
+    InputStream &operator=(InputStream &other);
 
+    internal::InputStreamImpl *impl;
+    Status lastError;
+};
 }
 
 #endif /* _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.cc
new file mode 100644
index 0000000..ca4c645
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.cc
@@ -0,0 +1,919 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "FileSystemImpl.h"
+#include "InputStreamImpl.h"
+#include "LocalBlockReader.h"
+#include "Logger.h"
+#include "RemoteBlockReader.h"
+#include "Thread.h"
+#include "UnorderedMap.h"
+#include "server/Datanode.h"
+
+#include <algorithm>
+#include <ifaddrs.h>
+#include <inttypes.h>
+#include <iostream>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace hdfs {
+namespace internal {
+
+mutex InputStreamImpl::MutLocalBlockInforCache;
+unordered_map<uint32_t, shared_ptr<LocalBlockInforCacheType>>
+    InputStreamImpl::LocalBlockInforCache;
+
+unordered_set<std::string> BuildLocalAddrSet() {
+    unordered_set<std::string> set;
+    struct ifaddrs *ifAddr = NULL;
+    struct ifaddrs *pifAddr = NULL;
+    struct sockaddr *addr;
+
+    if (getifaddrs(&ifAddr)) {
+        THROW(HdfsNetworkException,
+              "InputStreamImpl: cannot get local network interface: %s",
+              GetSystemErrorInfo(errno));
+    }
+
+    try {
+        std::vector<char> host;
+        const char *pHost;
+        host.resize(INET6_ADDRSTRLEN + 1);
+
+        for (pifAddr = ifAddr; pifAddr != NULL; pifAddr = pifAddr->ifa_next) {
+            addr = pifAddr->ifa_addr;
+            memset(&host[0], 0, INET6_ADDRSTRLEN + 1);
+
+            if (addr->sa_family == AF_INET) {
+                pHost = inet_ntop(
+                    addr->sa_family,
+                    &(reinterpret_cast<struct sockaddr_in *>(addr))->sin_addr,
+                    &host[0], INET6_ADDRSTRLEN);
+            } else if (addr->sa_family == AF_INET6) {
+                pHost = inet_ntop(
+                    addr->sa_family,
+                    &(reinterpret_cast<struct sockaddr_in6 *>(addr))->sin6_addr,
+                    &host[0], INET6_ADDRSTRLEN);
+            } else {
+                continue;
+            }
+
+            if (NULL == pHost) {
+                THROW(HdfsNetworkException,
+                      "InputStreamImpl: cannot get convert network address "
+                      "to textual form: %s",
+                      GetSystemErrorInfo(errno));
+            }
+
+            set.insert(pHost);
+        }
+
+        /*
+         * add hostname.
+         */
+        long hostlen = sysconf(_SC_HOST_NAME_MAX);
+        host.resize(hostlen + 1);
+
+        if (gethostname(&host[0], host.size())) {
+            THROW(HdfsNetworkException,
+                  "InputStreamImpl: cannot get hostname: %s",
+                  GetSystemErrorInfo(errno));
+        }
+
+        set.insert(&host[0]);
+    } catch (...) {
+        if (ifAddr != NULL) {
+            freeifaddrs(ifAddr);
+        }
+
+        throw;
+    }
+
+    if (ifAddr != NULL) {
+        freeifaddrs(ifAddr);
+    }
+
+    return set;
+}
+
+InputStreamImpl::InputStreamImpl()
+    : closed(true),
+      localRead(true),
+      readFromUnderConstructedBlock(false),
+      verify(true),
+      maxGetBlockInfoRetry(3),
+      cursor(0),
+      endOfCurBlock(0),
+      lastBlockBeingWrittenLength(0),
+      prefetchSize(0) {
+#ifdef MOCK
+    stub = NULL;
+#endif
+}
+
+InputStreamImpl::~InputStreamImpl() {
+}
+
+void InputStreamImpl::checkStatus() {
+    if (closed) {
+        THROW(HdfsIOException, "InputStreamImpl: stream is not opened.");
+    }
+
+    if (lastError != exception_ptr()) {
+        rethrow_exception(lastError);
+    }
+}
+
+int64_t InputStreamImpl::readBlockLength(const LocatedBlock &b) {
+    const std::vector<DatanodeInfo> &nodes = b.getLocations();
+    int replicaNotFoundCount = nodes.size();
+
+    for (size_t i = 0; i < nodes.size(); ++i) {
+        try {
+            int64_t n = 0;
+            shared_ptr<Datanode> dn;
+            RpcAuth a = auth;
+            a.getUser().addToken(b.getToken());
+#ifdef MOCK
+
+            if (stub) {
+                dn = stub->getDatanode();
+            } else {
+                dn = shared_ptr<Datanode>(
+                    new DatanodeImpl(nodes[i].getIpAddr().c_str(),
+                                     nodes[i].getIpcPort(), *conf, a));
+            }
+
+#else
+            dn = shared_ptr<Datanode>(new DatanodeImpl(
+                nodes[i].getIpAddr().c_str(), nodes[i].getIpcPort(), *conf, a));
+#endif
+            n = dn->getReplicaVisibleLength(b);
+
+            if (n >= 0) {
+                return n;
+            }
+        } catch (const ReplicaNotFoundException &e) {
+            LOG(LOG_ERROR,
+                "InputStreamImpl: failed to get block "
+                "visible length for Block: %s file %s from Datanode: %s\n%s",
+                b.toString().c_str(), path.c_str(),
+                nodes[i].formatAddress().c_str(), GetExceptionDetail(e));
+            LOG(INFO,
+                "InputStreamImpl: retry get block visible length for Block: "
+                "%s file %s from other datanode",
+                b.toString().c_str(), path.c_str());
+            --replicaNotFoundCount;
+        } catch (const HdfsIOException &e) {
+            LOG(LOG_ERROR,
+                "InputStreamImpl: failed to get block visible length for "
+                "Block: %s file %s from Datanode: %s\n%s",
+                b.toString().c_str(), path.c_str(),
+                nodes[i].formatAddress().c_str(), GetExceptionDetail(e));
+            LOG(INFO,
+                "InputStreamImpl: retry get block visible length for Block: "
+                "%s file %s from other datanode",
+                b.toString().c_str(), path.c_str());
+        }
+    }
+
+    // Namenode told us about these locations, but none know about the replica
+    // means that we hit the race between pipeline creation start and end.
+    // we require all 3 because some other exception could have happened
+    // on a DN that has it.  we want to report that error
+    if (replicaNotFoundCount == 0) {
+        return 0;
+    }
+
+    return -1;
+}
+
+/**
+ * Getting blocks locations'information from namenode
+ */
+void InputStreamImpl::updateBlockInfos() {
+    int retry = maxGetBlockInfoRetry;
+
+    for (int i = 0; i < retry; ++i) {
+        try {
+            if (!lbs) {
+                lbs = shared_ptr<LocatedBlocks>(new LocatedBlocks);
+            }
+
+            filesystem->getBlockLocations(path, cursor, prefetchSize, *lbs);
+
+            if (lbs->isLastBlockComplete()) {
+                lastBlockBeingWrittenLength = 0;
+            } else {
+                shared_ptr<LocatedBlock> last = lbs->getLastBlock();
+
+                if (!last) {
+                    lastBlockBeingWrittenLength = 0;
+                } else {
+                    lastBlockBeingWrittenLength = readBlockLength(*last);
+
+                    if (lastBlockBeingWrittenLength == -1) {
+                        if (i + 1 >= retry) {
+                            THROW(HdfsIOException,
+                                  "InputStreamImpl: failed "
+                                  "to get block visible length for Block: "
+                                  "%s from all Datanode.",
+                                  last->toString().c_str());
+                        } else {
+                            LOG(LOG_ERROR,
+                                "InputStreamImpl: failed to get block visible "
+                                "length for Block: %s file %s from all "
+                                "Datanode.",
+                                last->toString().c_str(), path.c_str());
+
+                            try {
+                                sleep_for(milliseconds(4000));
+                            } catch (...) {
+                            }
+
+                            continue;
+                        }
+                    }
+
+                    last->setNumBytes(lastBlockBeingWrittenLength);
+                }
+            }
+
+            return;
+        } catch (const HdfsRpcException &e) {
+            LOG(LOG_ERROR,
+                "InputStreamImpl: failed to get block information "
+                "for file %s, %s",
+                path.c_str(), GetExceptionDetail(e));
+
+            if (i + 1 >= retry) {
+                throw;
+            }
+        }
+
+        LOG(INFO,
+            "InputStreamImpl: retry to get block information for "
+            "file: %s, already tried %d time(s).",
+            path.c_str(), i + 1);
+    }
+}
+
+int64_t InputStreamImpl::getFileLength() {
+    int64_t length = lbs->getFileLength();
+
+    if (!lbs->isLastBlockComplete()) {
+        length += lastBlockBeingWrittenLength;
+    }
+
+    return length;
+}
+
+void InputStreamImpl::seekToBlock(const LocatedBlock &lb) {
+    if (cursor >= lbs->getFileLength()) {
+        assert(!lbs->isLastBlockComplete());
+        readFromUnderConstructedBlock = true;
+    } else {
+        readFromUnderConstructedBlock = false;
+    }
+
+    assert(cursor >= lb.getOffset() &&
+           cursor < lb.getOffset() + lb.getNumBytes());
+    curBlock = shared_ptr<LocatedBlock>(new LocatedBlock(lb));
+    int64_t blockSize = curBlock->getNumBytes();
+    assert(blockSize > 0);
+    endOfCurBlock = blockSize + curBlock->getOffset();
+    failedNodes.clear();
+    blockReader.reset();
+}
+
+bool InputStreamImpl::choseBestNode() {
+    const std::vector<DatanodeInfo> &nodes = curBlock->getLocations();
+
+    for (size_t i = 0; i < nodes.size(); ++i) {
+        if (std::binary_search(failedNodes.begin(), failedNodes.end(),
+                               nodes[i])) {
+            continue;
+        }
+
+        curNode = nodes[i];
+        return true;
+    }
+
+    return false;
+}
+
+bool InputStreamImpl::isLocalNode() {
+    static const unordered_set<std::string> LocalAddrSet = BuildLocalAddrSet();
+    bool retval = LocalAddrSet.find(curNode.getIpAddr()) != LocalAddrSet.end();
+    return retval;
+}
+
+BlockLocalPathInfo InputStreamImpl::getBlockLocalPathInfo(
+    LocalBlockInforCacheType &cache, const LocatedBlock &b) {
+    BlockLocalPathInfo retval;
+
+    try {
+        if (!cache.find(LocalBlockInforCacheKey(b.getBlockId(), b.getPoolId()),
+                        retval)) {
+            RpcAuth a = auth;
+            /*
+             * only kerberos based authentication is allowed, do not add token
+             */
+            shared_ptr<Datanode> dn = shared_ptr<Datanode>(new DatanodeImpl(
+                curNode.getIpAddr().c_str(), curNode.getIpcPort(), *conf, a));
+            dn->getBlockLocalPathInfo(b, b.getToken(), retval);
+            cache.insert(LocalBlockInforCacheKey(b.getBlockId(), b.getPoolId()),
+                         retval);
+        }
+    } catch (const HdfsIOException &e) {
+        throw;
+    } catch (const HdfsException &e) {
+        NESTED_THROW(
+            HdfsIOException,
+            "InputStreamImpl: Failed to get block local path information.");
+    }
+
+    return retval;
+}
+
+void InputStreamImpl::invalidCacheEntry(LocalBlockInforCacheType &cache,
+                                        const LocatedBlock &b) {
+    cache.erase(LocalBlockInforCacheKey(b.getBlockId(), b.getPoolId()));
+}
+
+LocalBlockInforCacheType &InputStreamImpl::getBlockLocalPathInfoCache(
+    uint32_t port) {
+    lock_guard<mutex> lock(MutLocalBlockInforCache);
+    unordered_map<uint32_t, shared_ptr<LocalBlockInforCacheType>>::iterator it;
+    it = LocalBlockInforCache.find(port);
+
+    if (it == LocalBlockInforCache.end()) {
+        shared_ptr<LocalBlockInforCacheType> retval;
+        retval =
+            shared_ptr<LocalBlockInforCacheType>(new LocalBlockInforCacheType(
+                conf->getMaxLocalBlockInfoCacheSize()));
+        LocalBlockInforCache[port] = retval;
+        return *retval;
+    } else {
+        return *(it->second);
+    }
+}
+
+void InputStreamImpl::setupBlockReader(bool temporaryDisableLocalRead) {
+    bool lastReadFromLocal = false;
+    exception_ptr lastException;
+
+    while (true) {
+        if (!choseBestNode()) {
+            try {
+                if (lastException) {
+                    rethrow_exception(lastException);
+                }
+            } catch (...) {
+                NESTED_THROW(
+                    HdfsIOException,
+                    "InputStreamImpl: all nodes have been tried and no valid "
+                    "replica can be read for Block: %s.",
+                    curBlock->toString().c_str());
+            }
+
+            THROW(HdfsIOException,
+                  "InputStreamImpl: all nodes have been tried and no valid "
+                  "replica can be read for Block: %s.",
+                  curBlock->toString().c_str());
+        }
+
+        try {
+            int64_t offset, len;
+            offset = cursor - curBlock->getOffset();
+            assert(offset >= 0);
+            len = curBlock->getNumBytes() - offset;
+            assert(len > 0);
+
+            if (auth.getProtocol() == AuthProtocol::NONE &&
+                !temporaryDisableLocalRead && !lastReadFromLocal &&
+                !readFromUnderConstructedBlock && localRead && isLocalNode()) {
+                lastReadFromLocal = true;
+                LocalBlockInforCacheType &cache =
+                    getBlockLocalPathInfoCache(curNode.getXferPort());
+                BlockLocalPathInfo info =
+                    getBlockLocalPathInfo(cache, *curBlock);
+                assert(curBlock->getBlockId() == info.getBlock().getBlockId() &&
+                       curBlock->getPoolId() == info.getBlock().getPoolId());
+                LOG(DEBUG2,
+                    "%p setup local block reader for file %s from "
+                    "local block %s, block offset %" PRId64
+                    ", read block "
+                    "length %" PRId64 " end of Block %" PRId64
+                    ", local "
+                    "block file path %s",
+                    this, path.c_str(), curBlock->toString().c_str(), offset,
+                    len, offset + len, info.getLocalBlockPath());
+
+                if (0 != access(info.getLocalMetaPath(), R_OK)) {
+                    invalidCacheEntry(cache, *curBlock);
+                    continue;
+                }
+
+                try {
+                    blockReader = shared_ptr<BlockReader>(
+                        new LocalBlockReader(info, *curBlock, offset, verify,
+                                             *conf, localReaderBuffer));
+                } catch (...) {
+                    invalidCacheEntry(cache, *curBlock);
+                    throw;
+                }
+            } else {
+                const char *clientName;
+                LOG(DEBUG2,
+                    "%p setup remote block reader for file %s from "
+                    "remote block %s, block offset %" PRId64
+                    ""
+                    ", read block length %" PRId64 " end of block %" PRId64
+                    ", from node %s",
+                    this, path.c_str(), curBlock->toString().c_str(), offset,
+                    len, offset + len, curNode.formatAddress().c_str());
+                clientName = filesystem->getClientName();
+                lastReadFromLocal = false;
+                blockReader = shared_ptr<BlockReader>(new RemoteBlockReader(
+                    *curBlock, curNode, offset, len, curBlock->getToken(),
+                    clientName, verify, *conf));
+            }
+
+            break;
+        } catch (const HdfsIOException &e) {
+            lastException = current_exception();
+
+            if (lastReadFromLocal) {
+                LOG(LOG_ERROR,
+                    "cannot setup block reader for Block: %s file %s "
+                    "on Datanode: %s.\n%s\n"
+                    "retry the same node but disable reading from local block",
+                    curBlock->toString().c_str(), path.c_str(),
+                    curNode.formatAddress().c_str(), GetExceptionDetail(e));
+                /*
+                 * do not add node into failedNodes since we will retry the same
+                 * node
+                 * but
+                 * disable local block reading
+                 */
+            } else {
+                LOG(LOG_ERROR,
+                    "cannot setup block reader for Block: %s file %s on "
+                    "Datanode: %s.\n%s\nretry another node",
+                    curBlock->toString().c_str(), path.c_str(),
+                    curNode.formatAddress().c_str(), GetExceptionDetail(e));
+                failedNodes.push_back(curNode);
+                std::sort(failedNodes.begin(), failedNodes.end());
+            }
+        }
+    }
+}
+
+void InputStreamImpl::open(shared_ptr<FileSystemImpl> fs, const char *path,
+                           bool verifyChecksum) {
+    if (NULL == path || 0 == strlen(path)) {
+        THROW(InvalidParameter, "path is invalid.");
+    }
+
+    try {
+        openInternal(fs, path, verifyChecksum);
+    } catch (...) {
+        close();
+        throw;
+    }
+}
+
+void InputStreamImpl::openInternal(shared_ptr<FileSystemImpl> fs,
+                                   const char *path, bool verifyChecksum) {
+    try {
+        filesystem = fs;
+        verify = verifyChecksum;
+        this->path = fs->getStandardPath(path);
+        LOG(DEBUG2, "%p, open file %s for read, verfyChecksum is %s", this,
+            this->path.c_str(), (verifyChecksum ? "true" : "false"));
+        conf = shared_ptr<SessionConfig>(new SessionConfig(fs->getConf()));
+        this->auth = RpcAuth(fs->getUserInfo(),
+                             RpcAuth::ParseMethod(conf->getRpcAuthMethod()));
+        prefetchSize = conf->getDefaultBlockSize() * conf->getPrefetchSize();
+        localRead = conf->isReadFromLocal();
+        maxGetBlockInfoRetry = conf->getMaxGetBlockInfoRetry();
+        updateBlockInfos();
+        closed = false;
+    } catch (const HdfsCanceled &e) {
+        throw;
+    } catch (const FileNotFoundException &e) {
+        throw;
+    } catch (const HdfsException &e) {
+        NESTED_THROW(HdfsIOException, "InputStreamImpl: cannot open file: %s.",
+                     this->path.c_str());
+    }
+}
+
+int32_t InputStreamImpl::read(char *buf, int32_t size) {
+    checkStatus();
+
+    try {
+        int64_t prvious = cursor;
+        int32_t done = readInternal(buf, size);
+        LOG(DEBUG3, "%p read file %s size is %d, offset %" PRId64
+                    " done %d, "
+                    "next pos %" PRId64,
+            this, path.c_str(), size, prvious, done, cursor);
+        return done;
+    } catch (const HdfsEndOfStream &e) {
+        throw;
+    } catch (...) {
+        lastError = current_exception();
+        throw;
+    }
+}
+
+int32_t InputStreamImpl::readOneBlock(char *buf, int32_t size,
+                                      bool shouldUpdateMetadataOnFailure) {
+    bool temporaryDisableLocalRead = false;
+
+    while (true) {
+        try {
+            /*
+             * Setup block reader here and handle failure.
+             */
+            if (!blockReader) {
+                setupBlockReader(temporaryDisableLocalRead);
+                temporaryDisableLocalRead = false;
+            }
+        } catch (const HdfsInvalidBlockToken &e) {
+            LOG(LOG_ERROR,
+                "InputStreamImpl: failed to read Block: %s file %s, \n%s, "
+                "retry after updating block informations.",
+                curBlock->toString().c_str(), path.c_str(),
+                GetExceptionDetail(e));
+            return -1;
+        } catch (const HdfsIOException &e) {
+            /*
+             * In setupBlockReader, we have tried all the replicas.
+             * We now update block informations once, and try again.
+             */
+            if (shouldUpdateMetadataOnFailure) {
+                LOG(LOG_ERROR,
+                    "InputStreamImpl: failed to read Block: %s file %s, \n%s, "
+                    "retry after updating block informations.",
+                    curBlock->toString().c_str(), path.c_str(),
+                    GetExceptionDetail(e));
+                return -1;
+            } else {
+                /*
+                 * We have updated block informations and failed again.
+                 */
+                throw;
+            }
+        }
+
+        /*
+         * Block reader has been setup, read from block reader.
+         */
+        try {
+            int32_t todo = size;
+            todo = todo < endOfCurBlock - cursor
+                       ? todo
+                       : static_cast<int32_t>(endOfCurBlock - cursor);
+            assert(blockReader);
+            todo = blockReader->read(buf, todo);
+            cursor += todo;
+            /*
+             * Exit the loop and function from here if success.
+             */
+            return todo;
+        } catch (const HdfsIOException &e) {
+            /*
+             * Failed to read from current block reader,
+             * add the current datanode to invalid node list and try again.
+             */
+            LOG(LOG_ERROR,
+                "InputStreamImpl: failed to read Block: %s file %s from "
+                "Datanode: %s, \n%s, "
+                "retry read again from another Datanode.",
+                curBlock->toString().c_str(), path.c_str(),
+                curNode.formatAddress().c_str(), GetExceptionDetail(e));
+
+            if (conf->doesNotRetryAnotherNode()) {
+                throw;
+            }
+        } catch (const ChecksumException &e) {
+            LOG(LOG_ERROR,
+                "InputStreamImpl: failed to read Block: %s file %s "
+                "from Datanode: %s, \n%s, retry read again from "
+                "another Datanode.",
+                curBlock->toString().c_str(), path.c_str(),
+                curNode.formatAddress().c_str(), GetExceptionDetail(e));
+        }
+
+        /*
+         * Successfully create the block reader but failed to read.
+         * Disable the local block reader and try the same node again.
+         */
+        if (!blockReader ||
+            dynamic_cast<LocalBlockReader *>(blockReader.get())) {
+            temporaryDisableLocalRead = true;
+        } else {
+            /*
+             * Remote block reader failed to read, try another node.
+             */
+            LOG(INFO,
+                "IntputStreamImpl: Add invalid datanode %s to failed "
+                "datanodes and try another datanode again for file %s.",
+                curNode.formatAddress().c_str(), path.c_str());
+            failedNodes.push_back(curNode);
+            std::sort(failedNodes.begin(), failedNodes.end());
+        }
+
+        blockReader.reset();
+    }
+}
+
+/**
+ * To read data from hdfs.
+ * @param buf the buffer used to filled.
+ * @param size buffer size.
+ * @return return the number of bytes filled in the buffer, it may less than
+ * size.
+ */
+int32_t InputStreamImpl::readInternal(char *buf, int32_t size) {
+    int updateMetadataOnFailure = conf->getMaxReadBlockRetry();
+
+    try {
+        do {
+            const LocatedBlock *lb = NULL;
+
+            /*
+             * Check if we have got the block information we need.
+             */
+            if (!lbs || cursor >= getFileLength() ||
+                (cursor >= endOfCurBlock && !(lb = lbs->findBlock(cursor)))) {
+                /*
+                 * Get block information from namenode.
+                 * Do RPC failover work in updateBlockInfos.
+                 */
+                updateBlockInfos();
+
+                /*
+                 * We already have the up-to-date block information,
+                 * Check if we reach the end of file.
+                 */
+                if (cursor >= getFileLength()) {
+                    THROW(HdfsEndOfStream,
+                          "InputStreamImpl: read over EOF, current position: "
+                          "%" PRId64 ", read size: %d, from file: %s",
+                          cursor, size, path.c_str());
+                }
+            }
+
+            /*
+             * If we reach the end of block or the block information has just
+             * updated,
+             * seek to the right block to read.
+             */
+            if (cursor >= endOfCurBlock) {
+                lb = lbs->findBlock(cursor);
+
+                if (!lb) {
+                    THROW(HdfsIOException,
+                          "InputStreamImpl: cannot find block information at "
+                          "position: %" PRId64 " for file: %s",
+                          cursor, path.c_str());
+                }
+
+                /*
+                 * Seek to the right block, setup all needed variable,
+                 * but do not setup block reader, setup it latter.
+                 */
+                seekToBlock(*lb);
+            }
+
+            int32_t retval =
+                readOneBlock(buf, size, updateMetadataOnFailure > 0);
+
+            /*
+             * Now we have tried all replicas and failed.
+             * We will update metadata once and try again.
+             */
+            if (retval < 0) {
+                lbs.reset();
+                endOfCurBlock = 0;
+                --updateMetadataOnFailure;
+
+                try {
+                    sleep_for(seconds(1));
+                } catch (...) {
+                }
+
+                continue;
+            }
+
+            return retval;
+        } while (true);
+    } catch (const HdfsCanceled &e) {
+        throw;
+    } catch (const HdfsEndOfStream &e) {
+        throw;
+    } catch (const HdfsException &e) {
+        /*
+         * wrap the underlying error and rethrow.
+         */
+        NESTED_THROW(HdfsIOException,
+                     "InputStreamImpl: cannot read file: %s, from "
+                     "position %" PRId64 ", size: %d.",
+                     path.c_str(), cursor, size);
+    }
+}
+
+/**
+ * To read data from hdfs, block until get the given size of bytes.
+ * @param buf the buffer used to filled.
+ * @param size the number of bytes to be read.
+ */
+void InputStreamImpl::readFully(char *buf, int64_t size) {
+    LOG(DEBUG3, "readFully file %s size is %" PRId64 ", offset %" PRId64,
+        path.c_str(), size, cursor);
+    checkStatus();
+
+    try {
+        return readFullyInternal(buf, size);
+    } catch (const HdfsEndOfStream &e) {
+        throw;
+    } catch (...) {
+        lastError = current_exception();
+        throw;
+    }
+}
+
+void InputStreamImpl::readFullyInternal(char *buf, int64_t size) {
+    int32_t done;
+    int64_t pos = cursor, todo = size;
+
+    try {
+        while (todo > 0) {
+            done = todo < std::numeric_limits<int32_t>::max()
+                       ? static_cast<int32_t>(todo)
+                       : std::numeric_limits<int32_t>::max();
+            done = readInternal(buf + (size - todo), done);
+            todo -= done;
+        }
+    } catch (const HdfsCanceled &e) {
+        throw;
+    } catch (const HdfsEndOfStream &e) {
+        THROW(HdfsEndOfStream,
+              "InputStreamImpl: read over EOF, current position: %" PRId64
+              ", read size: %" PRId64 ", from file: %s",
+              pos, size, path.c_str());
+    } catch (const HdfsException &e) {
+        NESTED_THROW(HdfsIOException,
+                     "InputStreamImpl: cannot read fully from file: %s, "
+                     "from position %" PRId64 ", size: %" PRId64 ".",
+                     path.c_str(), pos, size);
+    }
+}
+
+int64_t InputStreamImpl::available() {
+    checkStatus();
+
+    try {
+        if (blockReader) {
+            return blockReader->available();
+        }
+    } catch (...) {
+        lastError = current_exception();
+        throw;
+    }
+
+    return 0;
+}
+
+/**
+ * To move the file point to the given position.
+ * @param size the given position.
+ */
+void InputStreamImpl::seek(int64_t pos) {
+    LOG(DEBUG2, "%p seek file %s to %" PRId64 ", offset %" PRId64, this,
+        path.c_str(), pos, cursor);
+    checkStatus();
+
+    try {
+        seekInternal(pos);
+    } catch (...) {
+        lastError = current_exception();
+        throw;
+    }
+}
+
+void InputStreamImpl::seekInternal(int64_t pos) {
+    if (cursor == pos) {
+        return;
+    }
+
+    if (!lbs || pos > getFileLength()) {
+        updateBlockInfos();
+
+        if (pos > getFileLength()) {
+            THROW(HdfsEndOfStream,
+                  "InputStreamImpl: seek over EOF, current position: %" PRId64
+                  ", seek target: %" PRId64 ", in file: %s",
+                  cursor, pos, path.c_str());
+        }
+    }
+
+    try {
+        if (blockReader && pos > cursor && pos < endOfCurBlock) {
+            blockReader->skip(pos - cursor);
+            cursor = pos;
+            return;
+        }
+    } catch (const HdfsIOException &e) {
+        LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64
+                       " bytes in current block reader for file %s\n%s",
+            pos - cursor, path.c_str(), GetExceptionDetail(e));
+        LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64
+                  " for file %s",
+            pos, path.c_str());
+    } catch (const ChecksumException &e) {
+        LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64
+                       " bytes in current block reader for file %s\n%s",
+            pos - cursor, path.c_str(), GetExceptionDetail(e));
+        LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64
+                  " for file %s",
+            pos, path.c_str());
+    }
+
+    /**
+     * the seek target exceed the current block or skip failed in current block
+     * reader.
+     * reset current block reader and set the cursor to the target position to
+     * seek.
+     */
+    endOfCurBlock = 0;
+    blockReader.reset();
+    cursor = pos;
+}
+
+/**
+ * To get the current file point position.
+ * @return the position of current file point.
+ */
+int64_t InputStreamImpl::tell() {
+    checkStatus();
+    LOG(DEBUG2, "tell file %s at %" PRId64, path.c_str(), cursor);
+    return cursor;
+}
+
+/**
+ * Close the stream.
+ */
+void InputStreamImpl::close() {
+    LOG(DEBUG2, "%p close file %s for read", this, path.c_str());
+    closed = true;
+    localRead = true;
+    readFromUnderConstructedBlock = false;
+    verify = true;
+    filesystem.reset();
+    cursor = 0;
+    endOfCurBlock = 0;
+    lastBlockBeingWrittenLength = 0;
+    prefetchSize = 0;
+    blockReader.reset();
+    curBlock.reset();
+    lbs.reset();
+    conf.reset();
+    failedNodes.clear();
+    path.clear();
+    localReaderBuffer.resize(0);
+    lastError = exception_ptr();
+}
+
+std::string InputStreamImpl::toString() {
+    if (path.empty()) {
+        return std::string("InputStream for path ") + path;
+    } else {
+        return std::string("InputStream (not opened)");
+    }
+}
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h
index 8723344..5202a33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h
@@ -19,13 +19,10 @@
 #ifndef _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_
 #define _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_
 
-#include "platform.h"
-
 #include "BlockReader.h"
 #include "ExceptionInternal.h"
 #include "FileSystem.h"
 #include "Hash.h"
-#include "InputStreamInter.h"
 #include "LruMap.h"
 #include "SessionConfig.h"
 #include "SharedPtr.h"
@@ -45,14 +42,16 @@ namespace hdfs {
 namespace internal {
 
 typedef std::pair<int64_t, std::string> LocalBlockInforCacheKey;
-typedef LruMap<LocalBlockInforCacheKey, BlockLocalPathInfo> LocalBlockInforCacheType;
+typedef LruMap<LocalBlockInforCacheKey, BlockLocalPathInfo>
+    LocalBlockInforCacheType;
 
 /**
  * A input stream used read data from hdfs.
  */
-class InputStreamImpl: public InputStreamInter {
+class InputStreamImpl {
 public:
     InputStreamImpl();
+
     ~InputStreamImpl();
 
     /**
@@ -61,22 +60,24 @@ public:
      * @param path the file to be read.
      * @param verifyChecksum verify the checksum.
      */
-    void open(shared_ptr<FileSystemInter> fs, const char * path, bool verifyChecksum);
+    void open(shared_ptr<FileSystemImpl> fs, const char *path,
+              bool verifyChecksum);
 
     /**
      * To read data from hdfs.
      * @param buf the buffer used to filled.
      * @param size buffer size.
-     * @return return the number of bytes filled in the buffer, it may less than size.
+     * @return return the number of bytes filled in the buffer, it may less than
+     * size.
      */
-    int32_t read(char * buf, int32_t size);
+    int32_t read(char *buf, int32_t size);
 
     /**
      * To read data from hdfs, block until get the given size of bytes.
      * @param buf the buffer used to filled.
      * @param size the number of bytes to be read.
      */
-    void readFully(char * buf, int64_t size);
+    void readFully(char *buf, int64_t size);
 
     int64_t available();
 
@@ -100,23 +101,24 @@ public:
     std::string toString();
 
 private:
-    BlockLocalPathInfo getBlockLocalPathInfo(LocalBlockInforCacheType & cache,
-            const LocatedBlock & b);
+    BlockLocalPathInfo getBlockLocalPathInfo(LocalBlockInforCacheType &cache,
+                                             const LocatedBlock &b);
     bool choseBestNode();
     bool isLocalNode();
-    int32_t readInternal(char * buf, int32_t size);
-    int32_t readOneBlock(char * buf, int32_t size, bool shouldUpdateMetadataOnFailure);
+    int32_t readInternal(char *buf, int32_t size);
+    int32_t readOneBlock(char *buf, int32_t size,
+                         bool shouldUpdateMetadataOnFailure);
     int64_t getFileLength();
-    int64_t readBlockLength(const LocatedBlock & b);
-    LocalBlockInforCacheType & getBlockLocalPathInfoCache(uint32_t port);
+    int64_t readBlockLength(const LocatedBlock &b);
+    LocalBlockInforCacheType &getBlockLocalPathInfoCache(uint32_t port);
     void checkStatus();
-    void invalidCacheEntry(LocalBlockInforCacheType & cache,
-                           const LocatedBlock & b);
-    void openInternal(shared_ptr<FileSystemInter> fs, const char * path,
+    void invalidCacheEntry(LocalBlockInforCacheType &cache,
+                           const LocatedBlock &b);
+    void openInternal(shared_ptr<FileSystemImpl> fs, const char *path,
                       bool verifyChecksum);
-    void readFullyInternal(char * buf, int64_t size);
+    void readFullyInternal(char *buf, int64_t size);
     void seekInternal(int64_t pos);
-    void seekToBlock(const LocatedBlock & lb);
+    void seekToBlock(const LocatedBlock &lb);
     void setupBlockReader(bool temporaryDisableLocalRead);
     void updateBlockInfos();
 
@@ -135,7 +137,7 @@ private:
     int64_t prefetchSize;
     RpcAuth auth;
     shared_ptr<BlockReader> blockReader;
-    shared_ptr<FileSystemInter> filesystem;
+    shared_ptr<FileSystemImpl> filesystem;
     shared_ptr<LocatedBlock> curBlock;
     shared_ptr<LocatedBlocks> lbs;
     shared_ptr<SessionConfig> conf;
@@ -144,28 +146,31 @@ private:
     std::vector<char> localReaderBuffer;
 
     static mutex MutLocalBlockInforCache;
-    static unordered_map<uint32_t, shared_ptr<LocalBlockInforCacheType> > LocalBlockInforCache;
-#ifdef MOCK
+    static unordered_map<uint32_t, shared_ptr<LocalBlockInforCacheType>>
+        LocalBlockInforCache;
+
 private:
-    hdfs::mock::TestDatanodeStub * stub;
+    InputStreamImpl(const InputStreamImpl &other);
+    InputStreamImpl &operator=(const InputStreamImpl &other);
+
+#ifdef MOCK
+    hdfs::mock::TestDatanodeStub *stub;
 #endif
 };
-
 }
 }
 
 #ifdef NEED_BOOST
 
 namespace boost {
-template<>
+template <>
 struct hash<hdfs::internal::LocalBlockInforCacheKey> {
     std::size_t operator()(
-        const hdfs::internal::LocalBlockInforCacheKey & key) const {
+        const hdfs::internal::LocalBlockInforCacheKey &key) const {
         size_t values[] = {hdfs::internal::Int64Hasher(key.first),
-                           hdfs::internal::StringHasher(key.second)
-                          };
-        return hdfs::internal::CombineHasher(values,
-                                             sizeof(values) / sizeof(values[0]));
+                           hdfs::internal::StringHasher(key.second)};
+        return hdfs::internal::CombineHasher(
+            values, sizeof(values) / sizeof(values[0]));
     }
 };
 }
@@ -173,15 +178,14 @@ struct hash<hdfs::internal::LocalBlockInforCacheKey> {
 #else
 
 namespace std {
-template<>
+template <>
 struct hash<hdfs::internal::LocalBlockInforCacheKey> {
     std::size_t operator()(
-        const hdfs::internal::LocalBlockInforCacheKey & key) const {
-        size_t values[] = { hdfs::internal::Int64Hasher(key.first),
-                            hdfs::internal::StringHasher(key.second)
-                          };
-        return hdfs::internal::CombineHasher(values,
-                                             sizeof(values) / sizeof(values[0]));
+        const hdfs::internal::LocalBlockInforCacheKey &key) const {
+        size_t values[] = {hdfs::internal::Int64Hasher(key.first),
+                           hdfs::internal::StringHasher(key.second)};
+        return hdfs::internal::CombineHasher(
+            values, sizeof(values) / sizeof(values[0]));
     }
 };
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h
index 6118403..d26745d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h
@@ -32,11 +32,11 @@
 namespace hdfs {
 namespace internal {
 
-class LocalBlockReader: public BlockReader {
+class LocalBlockReader : public BlockReader {
 public:
-    LocalBlockReader(const BlockLocalPathInfo & info,
-                     const ExtendedBlock & block, int64_t offset, bool verify,
-                     SessionConfig & conf, std::vector<char> & buffer);
+    LocalBlockReader(const BlockLocalPathInfo &info, const ExtendedBlock &block,
+                     int64_t offset, bool verify, SessionConfig &conf,
+                     std::vector<char> &buffer);
 
     ~LocalBlockReader();
 
@@ -55,7 +55,7 @@ public:
      * @return return the number of bytes filled in the buffer,
      *  it may less than size. Return 0 if reach the end of block.
      */
-    virtual int32_t read(char * buf, int32_t size);
+    virtual int32_t read(char *buf, int32_t size);
 
     /**
      * Move the cursor forward len bytes.
@@ -64,34 +64,36 @@ public:
     virtual void skip(int64_t len);
 
 private:
+    LocalBlockReader(const LocalBlockReader &other);
+    LocalBlockReader &operator=(const LocalBlockReader &other);
+
     /**
      * Fill buffer and verify checksum.
      * @param bufferSize The size of buffer.
      */
     void readAndVerify(int32_t bufferSize);
-    int32_t readInternal(char * buf, int32_t len);
+    int32_t readInternal(char *buf, int32_t len);
 
 private:
-    bool verify; //verify checksum or not.
+    bool verify;  // verify checksum or not.
     const char *pbuffer;
     const char *pMetaBuffer;
     const ExtendedBlock &block;
     int checksumSize;
     int chunkSize;
     int localBufferSize;
-    int position; //point in buffer.
-    int size;  //data size in buffer.
-    int64_t cursor; //point in block.
-    int64_t length; //data size of block.
+    int position;    // point in buffer.
+    int size;        // data size in buffer.
+    int64_t cursor;  // point in block.
+    int64_t length;  // data size of block.
     shared_ptr<Checksum> checksum;
     shared_ptr<FileWrapper> dataFd;
     shared_ptr<FileWrapper> metaFd;
     std::string dataFilePath;
     std::string metaFilePath;
-    std::vector<char> & buffer;
+    std::vector<char> &buffer;
     std::vector<char> metaBuffer;
 };
-
 }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h
index 7598344..dd69b8b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h
@@ -29,11 +29,10 @@ namespace internal {
 
 class ConstPacketBuffer {
 public:
-    ConstPacketBuffer(const char * buf, int size) :
-        buffer(buf), size(size) {
+    ConstPacketBuffer(const char *buf, int size) : buffer(buf), size(size) {
     }
 
-    const char * getBuffer() const {
+    const char *getBuffer() const {
         return buffer;
     }
 
@@ -42,7 +41,7 @@ public:
     }
 
 private:
-    const char * buffer;
+    const char *buffer;
     const int size;
 };
 
@@ -65,13 +64,15 @@ public:
     /**
      * create a new packet
      */
-    Packet(int pktSize, int chunksPerPkt, int64_t offsetInBlock, int64_t seqno, int checksumSize);
+    Packet(int pktSize, int chunksPerPkt, int64_t offsetInBlock, int64_t seqno,
+           int checksumSize);
 
-    void reset(int pktSize, int chunksPerPkt, int64_t offsetInBlock, int64_t seqno, int checksumSize);
+    void reset(int pktSize, int chunksPerPkt, int64_t offsetInBlock,
+               int64_t seqno, int checksumSize);
 
     void addChecksum(uint32_t checksum);
 
-    void addData(const char * buf, int size);
+    void addData(const char *buf, int size);
 
     void setSyncFlag(bool sync);
 
@@ -102,21 +103,20 @@ public:
     }
 
 private:
-    bool lastPacketInBlock; // is this the last packet in block
-    bool syncBlock; // sync block to disk?
+    bool lastPacketInBlock;  // is this the last packet in block
+    bool syncBlock;          // sync block to disk?
     int checksumPos;
     int checksumSize;
     int checksumStart;
     int dataPos;
     int dataStart;
     int headerStart;
-    int maxChunks; // max chunks in packet
-    int numChunks; // number of chunks currently in packet
-    int64_t offsetInBlock; // offset in block
-    int64_t seqno; // sequence number of packet in block
+    int maxChunks;          // max chunks in packet
+    int numChunks;          // number of chunks currently in packet
+    int64_t offsetInBlock;  // offset in block
+    int64_t seqno;          // sequence number of packet in block
     std::vector<char> buffer;
 };
-
 }
 }
 #endif /* _HDFS_LIBHDFS3_CLIENT_PACKET_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h
index f8447b8..6d46a7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h
@@ -28,6 +28,8 @@ namespace internal {
 
 class PacketHeader {
 public:
+    static int GetPkgHeaderSize();
+    static int CalcPkgHeaderSize();
     PacketHeader();
     PacketHeader(int packetLen, int64_t offsetInBlock, int64_t seqno,
                  bool lastPacketInBlock, int dataLen);
@@ -37,24 +39,19 @@ public:
     int getPacketLen();
     int64_t getOffsetInBlock();
     int64_t getSeqno();
-    void readFields(const char * buf, size_t size);
+    void readFields(const char *buf, size_t size);
+
     /**
      * Write the header into the buffer.
      * This requires that PKT_HEADER_LEN bytes are available.
      */
-    void writeInBuffer(char * buf, size_t size);
-
-public:
-    static int GetPkgHeaderSize();
-    static int CalcPkgHeaderSize();
+    void writeInBuffer(char *buf, size_t size);
 
 private:
     static int PkgHeaderSize;
-private:
     int32_t packetLen;
     PacketHeaderProto proto;
 };
-
 }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
index 548118b..2a29acb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
@@ -35,7 +35,7 @@
 namespace hdfs {
 namespace internal {
 
-class RemoteBlockReader: public BlockReader {
+class RemoteBlockReader : public BlockReader {
 public:
     RemoteBlockReader(const ExtendedBlock &eb, DatanodeInfo &datanode,
                       int64_t start, int64_t len, const Token &token,
@@ -65,6 +65,8 @@ public:
     virtual void skip(int64_t len);
 
 private:
+    RemoteBlockReader(const RemoteBlockReader &other);
+    RemoteBlockReader &operator=(RemoteBlockReader &other);
     bool readTrailingEmptyPacket();
     shared_ptr<PacketHeader> readPacketHeader();
     void checkResponse();
@@ -72,20 +74,19 @@ private:
     void sendStatus();
     void verifyChecksum(int chunks);
 
-private:
-    bool verify; //verify checksum or not.
+    bool verify;  // verify checksum or not.
     DatanodeInfo &datanode;
     const ExtendedBlock &binfo;
     int checksumSize;
     int chunkSize;
     int connTimeout;
-    int position; //point in buffer.
+    int position;  // point in buffer.
     int readTimeout;
-    int size;  //data size in buffer.
+    int size;  // data size in buffer.
     int writeTimeout;
-    int64_t cursor; //point in block.
-    int64_t endOffset; //offset in block requested to read to.
-    int64_t lastSeqNo; //segno of the last chunk received
+    int64_t cursor;     // point in block.
+    int64_t endOffset;  // offset in block requested to read to.
+    int64_t lastSeqNo;  // segno of the last chunk received
     shared_ptr<BufferedSocketReader> in;
     shared_ptr<Checksum> checksum;
     shared_ptr<DataTransferProtocol> sender;
@@ -93,7 +94,6 @@ private:
     shared_ptr<Socket> sock;
     std::vector<char> buffer;
 };
-
 }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/TokenInternal.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/TokenInternal.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/TokenInternal.h
new file mode 100644
index 0000000..e00baa8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/TokenInternal.h
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_
+#define _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_
+
+#include "Hash.h"
+#include "Token.h"
+
+HDFS_HASH_DEFINE(::hdfs::internal::Token);
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.cc
new file mode 100644
index 0000000..3e1e841
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.cc
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"){} you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Config.h"
+#include "ConfigImpl.h"
+#include "XmlConfigParser.h"
+#include "StatusInternal.h"
+
+using namespace hdfs::internal;
+
+namespace hdfs {
+
+Config Config::CreateFromXmlFile(const std::string &path) {
+    return Config(new ConfigImpl(XmlConfigParser(path.c_str()).getKeyValue()));
+}
+
+Config::Config() : impl(new ConfigImpl) {
+}
+
+Config::Config(const Config &other) {
+    impl = new ConfigImpl(*other.impl);
+}
+
+Config::Config(ConfigImpl *impl) : impl(impl) {
+}
+
+Config &Config::operator=(const Config &other) {
+    if (this == &other) {
+        return *this;
+    }
+
+    ConfigImpl *temp = impl;
+    impl = new ConfigImpl(*other.impl);
+    delete temp;
+    return *this;
+}
+
+bool Config::operator==(const Config &other) const {
+    if (this == &other) {
+        return true;
+    }
+
+    return *impl == *other.impl;
+}
+
+Config::~Config() {
+    delete impl;
+}
+
+Status Config::getString(const std::string &key, std::string *output) const {
+    CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+    try {
+        *output = impl->getString(key.c_str());
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status Config::getString(const std::string &key, const std::string &def,
+                         std::string *output) const {
+    CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+    try {
+        *output = impl->getString(key.c_str(), def.c_str());
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status Config::getInt64(const std::string &key, std::string *output) const {
+    CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+    try {
+        *output = impl->getInt64(key.c_str());
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status Config::getInt64(const std::string &key, int64_t def,
+                        std::string *output) const {
+    CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+    try {
+        *output = impl->getInt64(key.c_str(), def);
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status Config::getInt32(const std::string &key, std::string *output) const {
+    CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+    try {
+        *output = impl->getInt32(key.c_str());
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status Config::getInt32(const std::string &key, int32_t def,
+                        std::string *output) const {
+    CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+    try {
+        *output = impl->getInt32(key.c_str(), def);
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status Config::getDouble(const std::string &key, std::string *output) const {
+    CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+    try {
+        *output = impl->getDouble(key.c_str());
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status Config::getDouble(const std::string &key, double def,
+                         std::string *output) const {
+    CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+    try {
+        *output = impl->getDouble(key.c_str(), def);
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status Config::getBool(const std::string &key, std::string *output) const {
+    CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+    try {
+        *output = impl->getBool(key.c_str());
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+Status Config::getBool(const std::string &key, bool def,
+                       std::string *output) const {
+    CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+    try {
+        *output = impl->getBool(key.c_str(), def);
+    } catch (...) {
+        return CreateStatusFromException(current_exception());
+    }
+
+    return Status::OK();
+}
+
+void Config::set(const std::string &key, const std::string &value) {
+    impl->set(key.c_str(), value);
+}
+
+void Config::set(const std::string &key, int32_t value) {
+    impl->set(key.c_str(), value);
+}
+
+void Config::set(const std::string &key, int64_t value) {
+    impl->set(key.c_str(), value);
+}
+
+void Config::set(const std::string &key, double value) {
+    impl->set(key.c_str(), value);
+}
+
+void Config::set(const std::string &key, bool value) {
+    impl->set(key.c_str(), value);
+}
+
+size_t Config::hash_value() const {
+    return impl->hash_value();
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.h
new file mode 100644
index 0000000..4359e3f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.h
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_COMMON_CONFIG_H_
+#define _HDFS_LIBHDFS3_COMMON_CONFIG_H_
+
+#include "Status.h"
+
+#include <stdint.h>
+#include <string>
+#include <map>
+
+namespace hdfs {
+
+class FileSystem;
+class NamenodeInfo;
+
+namespace internal {
+class ConfigImpl;
+class FileSystemImpl;
+}
+
+/**
+ * A configure file parser.
+ */
+class Config {
+public:
+    /**
+     * Create an instance from a XML file
+     * @param path the path of the configure file.
+     */
+    static Config CreateFromXmlFile(const std::string &path);
+
+    /**
+     * Construct a empty Config instance.
+     */
+    Config();
+
+    /**
+     * Copy constructor
+     */
+    Config(const Config &other);
+
+    /**
+     * Assignment operator.
+     */
+    Config &operator=(const Config &other);
+
+    /**
+     * Operator equal
+     */
+    bool operator==(const Config &other) const;
+
+    /**
+     * Destroy this instance
+     */
+    ~Config();
+
+    /**
+     * Get a string with given configure key.
+     * @param key The key of the configure item.
+     * @param output the pointer of the output parameter.
+     * @return the result status of this operation
+     */
+    Status getString(const std::string &key, std::string *output) const;
+
+    /**
+     * Get a string with given configure key.
+     * Return the default value def if key is not found.
+     * @param key The key of the configure item.
+     * @param def The defalut value.
+     * @param output the pointer of the output parameter.
+     * @return the result status of this operation
+     */
+    Status getString(const std::string &key, const std::string &def,
+                     std::string *output) const;
+
+    /**
+     * Get a 64 bit integer with given configure key.
+     * @param key The key of the configure item.
+     * @param output the pointer of the output parameter.
+     * @return the result status of this operation
+     */
+    Status getInt64(const std::string &key, std::string *output) const;
+
+    /**
+     * Get a 64 bit integer with given configure key.
+     * Return the default value def if key is not found.
+     * @param key The key of the configure item.
+     * @param def The defalut value.
+     * @param output the pointer of the output parameter.
+     * @return the result status of this operation
+     */
+    Status getInt64(const std::string &key, int64_t def,
+                    std::string *output) const;
+
+    /**
+     * Get a 32 bit integer with given configure key.
+     * @param key The key of the configure item.
+     * @param output the pointer of the output parameter.
+     * @return the result status of this operation
+     */
+    Status getInt32(const std::string &key, std::string *output) const;
+
+    /**
+     * Get a 32 bit integer with given configure key.
+     * Return the default value def if key is not found.
+     * @param key The key of the configure item.
+     * @param def The defalut value.
+     * @param output the pointer of the output parameter.
+     * @return the result status of this operation
+     */
+    Status getInt32(const std::string &key, int32_t def,
+                    std::string *output) const;
+
+    /**
+     * Get a double with given configure key.
+     * @param key The key of the configure item.
+     * @param output the pointer of the output parameter.
+     * @return the result status of this operation
+     */
+    Status getDouble(const std::string &key, std::string *output) const;
+
+    /**
+     * Get a double with given configure key.
+     * Return the default value def if key is not found.
+     * @param key The key of the configure item.
+     * @param def The defalut value.
+     * @param output the pointer of the output parameter.
+     * @return the result status of this operation
+     */
+    Status getDouble(const std::string &key, double def,
+                     std::string *output) const;
+
+    /**
+     * Get a boolean with given configure key.
+     * @param key The key of the configure item.
+     * @param output the pointer of the output parameter.
+     * @return the result status of this operation
+     */
+    Status getBool(const std::string &key, std::string *output) const;
+
+    /**
+     * Get a boolean with given configure key.
+     * Return the default value def if key is not found.
+     * @param key The key of the configure item.
+     * @param def The default value.
+     * @param output the pointer of the output parameter.
+     * @return the result status of this operation
+     */
+    Status getBool(const std::string &key, bool def, std::string *output) const;
+
+    /**
+     * Set a configure item
+     * @param key The key will set.
+     * @param value The value will be set to.
+     */
+    void set(const std::string &key, const std::string &value);
+
+    /**
+     * Set a configure item
+     * @param key The key will set.
+     * @param value The value will be set to.
+     */
+    void set(const std::string &key, int32_t value);
+
+    /**
+     * Set a configure item
+     * @param key The key will set.
+     * @param value The value will be set to.
+     */
+    void set(const std::string &key, int64_t value);
+
+    /**
+     * Set a configure item
+     * @param key The key will set.
+     * @param value The value will be set to.
+     */
+    void set(const std::string &key, double value);
+
+    /**
+     * Set a configure item
+     * @param key The key will set.
+     * @param value The value will be set to.
+     */
+    void set(const std::string &key, bool value);
+
+    /**
+     * Get the hash value of this object
+     * @return The hash value
+     */
+    size_t hash_value() const;
+
+private:
+    Config(hdfs::internal::ConfigImpl *impl);
+    hdfs::internal::ConfigImpl *impl;
+    friend class hdfs::FileSystem;
+    friend class hdfs::internal::FileSystemImpl;
+    friend class hdfs::NamenodeInfo;
+};
+}
+
+#endif /* _HDFS_LIBHDFS3_COMMON_CONFIG_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.cc
new file mode 100644
index 0000000..7bfb3ec
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.cc
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "Hash.h"
+#include "ConfigImpl.h"
+
+#include <cassert>
+#include <errno.h>
+#include <fstream>
+#include <limits>
+#include <string.h>
+#include <unistd.h>
+#include <vector>
+
+using std::map;
+using std::string;
+using std::vector;
+
+namespace hdfs {
+namespace internal {
+
+typedef map<string, string>::const_iterator Iterator;
+typedef map<string, string> Map;
+
+static int32_t StrToInt32(const char *str) {
+    long retval;
+    char *end = NULL;
+    errno = 0;
+    retval = strtol(str, &end, 0);
+
+    if (EINVAL == errno || 0 != *end) {
+        THROW(HdfsBadNumFoumat, "Invalid int32_t type: %s", str);
+    }
+
+    if (ERANGE == errno || retval > std::numeric_limits<int32_t>::max() ||
+        retval < std::numeric_limits<int32_t>::min()) {
+        THROW(HdfsBadNumFoumat, "Underflow/Overflow int32_t type: %s", str);
+    }
+
+    return retval;
+}
+
+static int64_t StrToInt64(const char *str) {
+    long long retval;
+    char *end = NULL;
+    errno = 0;
+    retval = strtoll(str, &end, 0);
+
+    if (EINVAL == errno || 0 != *end) {
+        THROW(HdfsBadNumFoumat, "Invalid int64_t type: %s", str);
+    }
+
+    if (ERANGE == errno || retval > std::numeric_limits<int64_t>::max() ||
+        retval < std::numeric_limits<int64_t>::min()) {
+        THROW(HdfsBadNumFoumat, "Underflow/Overflow int64_t type: %s", str);
+    }
+
+    return retval;
+}
+
+static bool StrToBool(const char *str) {
+    bool retval = false;
+
+    if (!strcasecmp(str, "true") || !strcmp(str, "1")) {
+        retval = true;
+    } else if (!strcasecmp(str, "false") || !strcmp(str, "0")) {
+        retval = false;
+    } else {
+        THROW(HdfsBadBoolFoumat, "Invalid bool type: %s", str);
+    }
+
+    return retval;
+}
+
+static double StrToDouble(const char *str) {
+    double retval;
+    char *end = NULL;
+    errno = 0;
+    retval = strtod(str, &end);
+
+    if (EINVAL == errno || 0 != *end) {
+        THROW(HdfsBadNumFoumat, "Invalid double type: %s", str);
+    }
+
+    if (ERANGE == errno || retval > std::numeric_limits<double>::max() ||
+        retval < std::numeric_limits<double>::min()) {
+        THROW(HdfsBadNumFoumat, "Underflow/Overflow int64_t type: %s", str);
+    }
+
+    return retval;
+}
+
+ConfigImpl::ConfigImpl(const Map &kv) : kv(kv) {
+}
+
+const char *ConfigImpl::getString(const std::string &key) const {
+    Iterator it = kv.find(key.c_str());
+
+    if (kv.end() == it) {
+        THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str());
+    }
+
+    return it->second.c_str();
+}
+
+const char *ConfigImpl::getString(const std::string &key,
+                                  const std::string &def) const {
+    Iterator it = kv.find(key.c_str());
+
+    if (kv.end() == it) {
+        return def.c_str();
+    } else {
+        return it->second.c_str();
+    }
+}
+
+int64_t ConfigImpl::getInt64(const std::string &key) const {
+    int64_t retval;
+    Iterator it = kv.find(key.c_str());
+
+    if (kv.end() == it) {
+        THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str());
+    }
+
+    try {
+        retval = StrToInt64(it->second.c_str());
+    } catch (const HdfsBadNumFoumat &e) {
+        NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+                     key.c_str());
+    }
+
+    return retval;
+}
+
+int64_t ConfigImpl::getInt64(const std::string &key, int64_t def) const {
+    int64_t retval;
+    Iterator it = kv.find(key.c_str());
+
+    if (kv.end() == it) {
+        return def;
+    }
+
+    try {
+        retval = StrToInt64(it->second.c_str());
+    } catch (const HdfsBadNumFoumat &e) {
+        NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+                     key.c_str());
+    }
+
+    return retval;
+}
+
+int32_t ConfigImpl::getInt32(const std::string &key) const {
+    int32_t retval;
+    Iterator it = kv.find(key.c_str());
+
+    if (kv.end() == it) {
+        THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str());
+    }
+
+    try {
+        retval = StrToInt32(it->second.c_str());
+    } catch (const HdfsBadNumFoumat &e) {
+        NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+                     key.c_str());
+    }
+
+    return retval;
+}
+
+int32_t ConfigImpl::getInt32(const std::string &key, int32_t def) const {
+    int32_t retval;
+    Iterator it = kv.find(key.c_str());
+
+    if (kv.end() == it) {
+        return def;
+    }
+
+    try {
+        retval = StrToInt32(it->second.c_str());
+    } catch (const HdfsBadNumFoumat &e) {
+        NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+                     key.c_str());
+    }
+
+    return retval;
+}
+
+double ConfigImpl::getDouble(const std::string &key) const {
+    double retval;
+    Iterator it = kv.find(key.c_str());
+
+    if (kv.end() == it) {
+        THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str());
+    }
+
+    try {
+        retval = StrToDouble(it->second.c_str());
+    } catch (const HdfsBadNumFoumat &e) {
+        NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+                     key.c_str());
+    }
+
+    return retval;
+}
+
+double ConfigImpl::getDouble(const std::string &key, double def) const {
+    double retval;
+    Iterator it = kv.find(key.c_str());
+
+    if (kv.end() == it) {
+        return def;
+    }
+
+    try {
+        retval = StrToDouble(it->second.c_str());
+    } catch (const HdfsBadNumFoumat &e) {
+        NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+                     key.c_str());
+    }
+
+    return retval;
+}
+
+bool ConfigImpl::getBool(const std::string &key) const {
+    bool retval;
+    Iterator it = kv.find(key.c_str());
+
+    if (kv.end() == it) {
+        THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str());
+    }
+
+    try {
+        retval = StrToBool(it->second.c_str());
+    } catch (const HdfsBadBoolFoumat &e) {
+        NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+                     key.c_str());
+    }
+
+    return retval;
+}
+
+bool ConfigImpl::getBool(const std::string &key, bool def) const {
+    bool retval;
+    Iterator it = kv.find(key.c_str());
+
+    if (kv.end() == it) {
+        return def;
+    }
+
+    try {
+        retval = StrToBool(it->second.c_str());
+    } catch (const HdfsBadNumFoumat &e) {
+        NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+                     key.c_str());
+    }
+
+    return retval;
+}
+
+size_t ConfigImpl::hash_value() const {
+    vector<size_t> values;
+    map<string, string>::const_iterator s, e;
+    e = kv.end();
+
+    for (s = kv.begin(); s != e; ++s) {
+        values.push_back(StringHasher(s->first));
+        values.push_back(StringHasher(s->second));
+    }
+
+    return CombineHasher(&values[0], values.size());
+}
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.h
new file mode 100644
index 0000000..5d058dd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.h
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_COMMON_XMLCONFIGIMPL_H_
+#define _HDFS_LIBHDFS3_COMMON_XMLCONFIGIMPL_H_
+
+#include <stdint.h>
+#include <string>
+#include <sstream>
+#include <map>
+
+namespace hdfs {
+namespace internal {
+
+/**
+ * A configure file parser.
+ */
+class ConfigImpl {
+public:
+    ConfigImpl() {
+    }
+
+    /**
+     * Construct a empty Config instance.
+     */
+    ConfigImpl(const std::map<std::string, std::string> &kv);
+
+    /**
+     * Operator equal
+     */
+    bool operator==(const ConfigImpl &other) const {
+        if (this == &other) {
+            return true;
+        }
+
+        return this->kv == other.kv;
+    }
+
+    /**
+     * Get a string with given configure key.
+     * @param key The key of the configure item.
+     * @return The value of configure item.
+     * @throw HdfsConfigNotFound
+     */
+    const char *getString(const std::string &key) const;
+
+    /**
+     * Get a string with given configure key.
+     * Return the default value def if key is not found.
+     * @param key The key of the configure item.
+     * @param def The defalut value.
+     * @return The value of configure item.
+     */
+    const char *getString(const std::string &key, const std::string &def) const;
+
+    /**
+     * Get a 64 bit integer with given configure key.
+     * @param key The key of the configure item.
+     * @return The value of configure item.
+     * @throw HdfsConfigNotFound
+     */
+    int64_t getInt64(const std::string &key) const;
+
+    /**
+     * Get a 64 bit integer with given configure key.
+     * Return the default value def if key is not found.
+     * @param key The key of the configure item.
+     * @param def The defalut value.
+     * @return The value of configure item.
+     */
+    int64_t getInt64(const std::string &key, int64_t def) const;
+
+    /**
+     * Get a 32 bit integer with given configure key.
+     * @param key The key of the configure item.
+     * @return The value of configure item.
+     * @throw HdfsConfigNotFound
+     */
+    int32_t getInt32(const std::string &key) const;
+
+    /**
+     * Get a 32 bit integer with given configure key.
+     * Return the default value def if key is not found.
+     * @param key The key of the configure item.
+     * @param def The defalut value.
+     * @return The value of configure item.
+     */
+    int32_t getInt32(const std::string &key, int32_t def) const;
+
+    /**
+     * Get a double with given configure key.
+     * @param key The key of the configure item.
+     * @return The value of configure item.
+     * @throw HdfsConfigNotFound
+     */
+    double getDouble(const std::string &key) const;
+
+    /**
+     * Get a double with given configure key.
+     * Return the default value def if key is not found.
+     * @param key The key of the configure item.
+     * @param def The defalut value.
+     * @return The value of configure item.
+     */
+    double getDouble(const std::string &key, double def) const;
+
+    /**
+     * Get a boolean with given configure key.
+     * @param key The key of the configure item.
+     * @return The value of configure item.
+     * @throw HdfsConfigNotFound
+     */
+    bool getBool(const std::string &key) const;
+
+    /**
+     * Get a boolean with given configure key.
+     * Return the default value def if key is not found.
+     * @param key The key of the configure item.
+     * @param def The default value.
+     * @return The value of configure item.
+     */
+    bool getBool(const std::string &key, bool def) const;
+
+    /**
+     * Set a configure item
+     * @param key The key will set.
+     * @param value The value will be set to.
+     */
+    template <typename T>
+    void set(const std::string &key, T const &value) {
+        std::stringstream ss;
+        ss << value;
+        kv[key] = ss.str();
+    }
+
+    /**
+     * Get the hash value of this object
+     *
+     * @return The hash value
+     */
+    size_t hash_value() const;
+
+private:
+    std::string path;
+    std::map<std::string, std::string> kv;
+};
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_COMMON_XMLCONFIGIMPL_H_ */