You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/10/25 01:21:27 UTC
[3/4] HDFS-7014. Implement input streams and file system
functionality (zhwangzw via cmccabe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.cc
new file mode 100644
index 0000000..3168fe4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.cc
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FileSystemImpl.h"
+#include "InputStream.h"
+#include "InputStreamImpl.h"
+#include "StatusInternal.h"
+
+using namespace hdfs::internal;
+
+namespace hdfs {
+
+InputStream::InputStream() {
+ impl = new internal::InputStreamImpl;
+}
+
+InputStream::~InputStream() {
+ delete impl;
+}
+
+Status InputStream::open(FileSystem &fs, const std::string &path,
+ bool verifyChecksum) {
+ if (!fs.impl) {
+ THROW(HdfsIOException, "FileSystem: not connected.");
+ }
+
+ try {
+ impl->open(fs.impl, path.c_str(), verifyChecksum);
+ } catch (...) {
+ return lastError = CreateStatusFromException(current_exception());
+ }
+
+ return lastError = Status::OK();
+}
+
+int32_t InputStream::read(char *buf, int32_t size) {
+ int32_t retval = -1;
+
+ try {
+ retval = impl->read(buf, size);
+ lastError = Status::OK();
+ } catch (...) {
+ lastError = CreateStatusFromException(current_exception());
+ }
+
+ return retval;
+}
+
+Status InputStream::readFully(char *buf, int64_t size) {
+ try {
+ impl->readFully(buf, size);
+ } catch (...) {
+ return lastError = CreateStatusFromException(current_exception());
+ }
+
+ return lastError = Status::OK();
+}
+
+int64_t InputStream::available() {
+ int64_t retval = -1;
+
+ try {
+ retval = impl->available();
+ lastError = Status::OK();
+ } catch (...) {
+ lastError = CreateStatusFromException(current_exception());
+ }
+
+ return retval;
+}
+
+Status InputStream::seek(int64_t pos) {
+ try {
+ impl->seek(pos);
+ } catch (...) {
+ return lastError = CreateStatusFromException(current_exception());
+ }
+
+ return lastError = Status::OK();
+}
+
+int64_t InputStream::tell() {
+ int64_t retval = -1;
+
+ try {
+ retval = impl->tell();
+ lastError = Status::OK();
+ } catch (...) {
+ lastError = CreateStatusFromException(current_exception());
+ }
+
+ return retval;
+}
+
+Status InputStream::close() {
+ try {
+ impl->close();
+ } catch (...) {
+ return lastError = CreateStatusFromException(current_exception());
+ }
+
+ return lastError = Status::OK();
+}
+
+Status InputStream::getLastError() {
+ return lastError;
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h
index ddd9434..bd7a5dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStream.h
@@ -19,20 +19,28 @@
#ifndef _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_
#define _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_
-#include "FileSystem.h"
+#include "Status.h"
namespace hdfs {
namespace internal {
-class InputStreamInter;
+class InputStreamImpl;
}
+class FileSystem;
+
/**
* A input stream used read data from hdfs.
*/
class InputStream {
public:
+ /**
+ * Construct an instance.
+ */
InputStream();
+ /**
+ * Destroy this instance.
+ */
~InputStream();
/**
@@ -40,51 +48,66 @@ public:
* @param fs hdfs file system.
* @param path the file to be read.
* @param verifyChecksum verify the checksum.
+ * @return the result status of this operation
*/
- void open(FileSystem & fs, const char * path, bool verifyChecksum = true);
+ Status open(FileSystem &fs, const std::string &path,
+ bool verifyChecksum = true);
/**
* To read data from hdfs.
* @param buf the buffer used to filled.
* @param size buffer size.
- * @return return the number of bytes filled in the buffer, it may less than size.
+ * @return return the number of bytes filled in the buffer, it may less than
+ * size, -1 on error.
*/
- int32_t read(char * buf, int32_t size);
+ int32_t read(char *buf, int32_t size);
/**
* To read data from hdfs, block until get the given size of bytes.
* @param buf the buffer used to filled.
* @param size the number of bytes to be read.
+ * @return the result status of this operation
*/
- void readFully(char * buf, int64_t size);
+ Status readFully(char *buf, int64_t size);
/**
* Get how many bytes can be read without blocking.
- * @return The number of bytes can be read without blocking.
+ * @return The number of bytes can be read without blocking, -1 on error.
*/
int64_t available();
/**
* To move the file point to the given position.
* @param pos the given position.
+ * @return the result status of this operation
*/
- void seek(int64_t pos);
+ Status seek(int64_t pos);
/**
* To get the current file point position.
- * @return the position of current file point.
+ * @return the position of current file pointer, -1 on error.
*/
int64_t tell();
/**
* Close the stream.
+ * @return the result status of this operation
*/
- void close();
+ Status close();
+
+ /**
+ * Get the error status of the last operation.
+ * @return the error status of the last operation.
+ */
+ Status getLastError();
private:
- Internal::InputStreamInter * impl;
-};
+ InputStream(const InputStream &other);
+ InputStream &operator=(InputStream &other);
+ internal::InputStreamImpl *impl;
+ Status lastError;
+};
}
#endif /* _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.cc
new file mode 100644
index 0000000..ca4c645
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.cc
@@ -0,0 +1,919 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "FileSystemImpl.h"
+#include "InputStreamImpl.h"
+#include "LocalBlockReader.h"
+#include "Logger.h"
+#include "RemoteBlockReader.h"
+#include "Thread.h"
+#include "UnorderedMap.h"
+#include "server/Datanode.h"
+
+#include <algorithm>
+#include <ifaddrs.h>
+#include <inttypes.h>
+#include <iostream>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace hdfs {
+namespace internal {
+
+mutex InputStreamImpl::MutLocalBlockInforCache;
+unordered_map<uint32_t, shared_ptr<LocalBlockInforCacheType>>
+ InputStreamImpl::LocalBlockInforCache;
+
+unordered_set<std::string> BuildLocalAddrSet() {
+ unordered_set<std::string> set;
+ struct ifaddrs *ifAddr = NULL;
+ struct ifaddrs *pifAddr = NULL;
+ struct sockaddr *addr;
+
+ if (getifaddrs(&ifAddr)) {
+ THROW(HdfsNetworkException,
+ "InputStreamImpl: cannot get local network interface: %s",
+ GetSystemErrorInfo(errno));
+ }
+
+ try {
+ std::vector<char> host;
+ const char *pHost;
+ host.resize(INET6_ADDRSTRLEN + 1);
+
+ for (pifAddr = ifAddr; pifAddr != NULL; pifAddr = pifAddr->ifa_next) {
+ addr = pifAddr->ifa_addr;
+ memset(&host[0], 0, INET6_ADDRSTRLEN + 1);
+
+ if (addr->sa_family == AF_INET) {
+ pHost = inet_ntop(
+ addr->sa_family,
+ &(reinterpret_cast<struct sockaddr_in *>(addr))->sin_addr,
+ &host[0], INET6_ADDRSTRLEN);
+ } else if (addr->sa_family == AF_INET6) {
+ pHost = inet_ntop(
+ addr->sa_family,
+ &(reinterpret_cast<struct sockaddr_in6 *>(addr))->sin6_addr,
+ &host[0], INET6_ADDRSTRLEN);
+ } else {
+ continue;
+ }
+
+ if (NULL == pHost) {
+ THROW(HdfsNetworkException,
+ "InputStreamImpl: cannot get convert network address "
+ "to textual form: %s",
+ GetSystemErrorInfo(errno));
+ }
+
+ set.insert(pHost);
+ }
+
+ /*
+ * add hostname.
+ */
+ long hostlen = sysconf(_SC_HOST_NAME_MAX);
+ host.resize(hostlen + 1);
+
+ if (gethostname(&host[0], host.size())) {
+ THROW(HdfsNetworkException,
+ "InputStreamImpl: cannot get hostname: %s",
+ GetSystemErrorInfo(errno));
+ }
+
+ set.insert(&host[0]);
+ } catch (...) {
+ if (ifAddr != NULL) {
+ freeifaddrs(ifAddr);
+ }
+
+ throw;
+ }
+
+ if (ifAddr != NULL) {
+ freeifaddrs(ifAddr);
+ }
+
+ return set;
+}
+
+InputStreamImpl::InputStreamImpl()
+ : closed(true),
+ localRead(true),
+ readFromUnderConstructedBlock(false),
+ verify(true),
+ maxGetBlockInfoRetry(3),
+ cursor(0),
+ endOfCurBlock(0),
+ lastBlockBeingWrittenLength(0),
+ prefetchSize(0) {
+#ifdef MOCK
+ stub = NULL;
+#endif
+}
+
+InputStreamImpl::~InputStreamImpl() {
+}
+
+void InputStreamImpl::checkStatus() {
+ if (closed) {
+ THROW(HdfsIOException, "InputStreamImpl: stream is not opened.");
+ }
+
+ if (lastError != exception_ptr()) {
+ rethrow_exception(lastError);
+ }
+}
+
+int64_t InputStreamImpl::readBlockLength(const LocatedBlock &b) {
+ const std::vector<DatanodeInfo> &nodes = b.getLocations();
+ int replicaNotFoundCount = nodes.size();
+
+ for (size_t i = 0; i < nodes.size(); ++i) {
+ try {
+ int64_t n = 0;
+ shared_ptr<Datanode> dn;
+ RpcAuth a = auth;
+ a.getUser().addToken(b.getToken());
+#ifdef MOCK
+
+ if (stub) {
+ dn = stub->getDatanode();
+ } else {
+ dn = shared_ptr<Datanode>(
+ new DatanodeImpl(nodes[i].getIpAddr().c_str(),
+ nodes[i].getIpcPort(), *conf, a));
+ }
+
+#else
+ dn = shared_ptr<Datanode>(new DatanodeImpl(
+ nodes[i].getIpAddr().c_str(), nodes[i].getIpcPort(), *conf, a));
+#endif
+ n = dn->getReplicaVisibleLength(b);
+
+ if (n >= 0) {
+ return n;
+ }
+ } catch (const ReplicaNotFoundException &e) {
+ LOG(LOG_ERROR,
+ "InputStreamImpl: failed to get block "
+ "visible length for Block: %s file %s from Datanode: %s\n%s",
+ b.toString().c_str(), path.c_str(),
+ nodes[i].formatAddress().c_str(), GetExceptionDetail(e));
+ LOG(INFO,
+ "InputStreamImpl: retry get block visible length for Block: "
+ "%s file %s from other datanode",
+ b.toString().c_str(), path.c_str());
+ --replicaNotFoundCount;
+ } catch (const HdfsIOException &e) {
+ LOG(LOG_ERROR,
+ "InputStreamImpl: failed to get block visible length for "
+ "Block: %s file %s from Datanode: %s\n%s",
+ b.toString().c_str(), path.c_str(),
+ nodes[i].formatAddress().c_str(), GetExceptionDetail(e));
+ LOG(INFO,
+ "InputStreamImpl: retry get block visible length for Block: "
+ "%s file %s from other datanode",
+ b.toString().c_str(), path.c_str());
+ }
+ }
+
+ // Namenode told us about these locations, but none know about the replica
+ // means that we hit the race between pipeline creation start and end.
+ // we require all 3 because some other exception could have happened
+ // on a DN that has it. we want to report that error
+ if (replicaNotFoundCount == 0) {
+ return 0;
+ }
+
+ return -1;
+}
+
+/**
+ * Getting blocks locations'information from namenode
+ */
+void InputStreamImpl::updateBlockInfos() {
+ int retry = maxGetBlockInfoRetry;
+
+ for (int i = 0; i < retry; ++i) {
+ try {
+ if (!lbs) {
+ lbs = shared_ptr<LocatedBlocks>(new LocatedBlocks);
+ }
+
+ filesystem->getBlockLocations(path, cursor, prefetchSize, *lbs);
+
+ if (lbs->isLastBlockComplete()) {
+ lastBlockBeingWrittenLength = 0;
+ } else {
+ shared_ptr<LocatedBlock> last = lbs->getLastBlock();
+
+ if (!last) {
+ lastBlockBeingWrittenLength = 0;
+ } else {
+ lastBlockBeingWrittenLength = readBlockLength(*last);
+
+ if (lastBlockBeingWrittenLength == -1) {
+ if (i + 1 >= retry) {
+ THROW(HdfsIOException,
+ "InputStreamImpl: failed "
+ "to get block visible length for Block: "
+ "%s from all Datanode.",
+ last->toString().c_str());
+ } else {
+ LOG(LOG_ERROR,
+ "InputStreamImpl: failed to get block visible "
+ "length for Block: %s file %s from all "
+ "Datanode.",
+ last->toString().c_str(), path.c_str());
+
+ try {
+ sleep_for(milliseconds(4000));
+ } catch (...) {
+ }
+
+ continue;
+ }
+ }
+
+ last->setNumBytes(lastBlockBeingWrittenLength);
+ }
+ }
+
+ return;
+ } catch (const HdfsRpcException &e) {
+ LOG(LOG_ERROR,
+ "InputStreamImpl: failed to get block information "
+ "for file %s, %s",
+ path.c_str(), GetExceptionDetail(e));
+
+ if (i + 1 >= retry) {
+ throw;
+ }
+ }
+
+ LOG(INFO,
+ "InputStreamImpl: retry to get block information for "
+ "file: %s, already tried %d time(s).",
+ path.c_str(), i + 1);
+ }
+}
+
+int64_t InputStreamImpl::getFileLength() {
+ int64_t length = lbs->getFileLength();
+
+ if (!lbs->isLastBlockComplete()) {
+ length += lastBlockBeingWrittenLength;
+ }
+
+ return length;
+}
+
+void InputStreamImpl::seekToBlock(const LocatedBlock &lb) {
+ if (cursor >= lbs->getFileLength()) {
+ assert(!lbs->isLastBlockComplete());
+ readFromUnderConstructedBlock = true;
+ } else {
+ readFromUnderConstructedBlock = false;
+ }
+
+ assert(cursor >= lb.getOffset() &&
+ cursor < lb.getOffset() + lb.getNumBytes());
+ curBlock = shared_ptr<LocatedBlock>(new LocatedBlock(lb));
+ int64_t blockSize = curBlock->getNumBytes();
+ assert(blockSize > 0);
+ endOfCurBlock = blockSize + curBlock->getOffset();
+ failedNodes.clear();
+ blockReader.reset();
+}
+
+bool InputStreamImpl::choseBestNode() {
+ const std::vector<DatanodeInfo> &nodes = curBlock->getLocations();
+
+ for (size_t i = 0; i < nodes.size(); ++i) {
+ if (std::binary_search(failedNodes.begin(), failedNodes.end(),
+ nodes[i])) {
+ continue;
+ }
+
+ curNode = nodes[i];
+ return true;
+ }
+
+ return false;
+}
+
+bool InputStreamImpl::isLocalNode() {
+ static const unordered_set<std::string> LocalAddrSet = BuildLocalAddrSet();
+ bool retval = LocalAddrSet.find(curNode.getIpAddr()) != LocalAddrSet.end();
+ return retval;
+}
+
+BlockLocalPathInfo InputStreamImpl::getBlockLocalPathInfo(
+ LocalBlockInforCacheType &cache, const LocatedBlock &b) {
+ BlockLocalPathInfo retval;
+
+ try {
+ if (!cache.find(LocalBlockInforCacheKey(b.getBlockId(), b.getPoolId()),
+ retval)) {
+ RpcAuth a = auth;
+ /*
+ * only kerberos based authentication is allowed, do not add token
+ */
+ shared_ptr<Datanode> dn = shared_ptr<Datanode>(new DatanodeImpl(
+ curNode.getIpAddr().c_str(), curNode.getIpcPort(), *conf, a));
+ dn->getBlockLocalPathInfo(b, b.getToken(), retval);
+ cache.insert(LocalBlockInforCacheKey(b.getBlockId(), b.getPoolId()),
+ retval);
+ }
+ } catch (const HdfsIOException &e) {
+ throw;
+ } catch (const HdfsException &e) {
+ NESTED_THROW(
+ HdfsIOException,
+ "InputStreamImpl: Failed to get block local path information.");
+ }
+
+ return retval;
+}
+
+void InputStreamImpl::invalidCacheEntry(LocalBlockInforCacheType &cache,
+ const LocatedBlock &b) {
+ cache.erase(LocalBlockInforCacheKey(b.getBlockId(), b.getPoolId()));
+}
+
+LocalBlockInforCacheType &InputStreamImpl::getBlockLocalPathInfoCache(
+ uint32_t port) {
+ lock_guard<mutex> lock(MutLocalBlockInforCache);
+ unordered_map<uint32_t, shared_ptr<LocalBlockInforCacheType>>::iterator it;
+ it = LocalBlockInforCache.find(port);
+
+ if (it == LocalBlockInforCache.end()) {
+ shared_ptr<LocalBlockInforCacheType> retval;
+ retval =
+ shared_ptr<LocalBlockInforCacheType>(new LocalBlockInforCacheType(
+ conf->getMaxLocalBlockInfoCacheSize()));
+ LocalBlockInforCache[port] = retval;
+ return *retval;
+ } else {
+ return *(it->second);
+ }
+}
+
+void InputStreamImpl::setupBlockReader(bool temporaryDisableLocalRead) {
+ bool lastReadFromLocal = false;
+ exception_ptr lastException;
+
+ while (true) {
+ if (!choseBestNode()) {
+ try {
+ if (lastException) {
+ rethrow_exception(lastException);
+ }
+ } catch (...) {
+ NESTED_THROW(
+ HdfsIOException,
+ "InputStreamImpl: all nodes have been tried and no valid "
+ "replica can be read for Block: %s.",
+ curBlock->toString().c_str());
+ }
+
+ THROW(HdfsIOException,
+ "InputStreamImpl: all nodes have been tried and no valid "
+ "replica can be read for Block: %s.",
+ curBlock->toString().c_str());
+ }
+
+ try {
+ int64_t offset, len;
+ offset = cursor - curBlock->getOffset();
+ assert(offset >= 0);
+ len = curBlock->getNumBytes() - offset;
+ assert(len > 0);
+
+ if (auth.getProtocol() == AuthProtocol::NONE &&
+ !temporaryDisableLocalRead && !lastReadFromLocal &&
+ !readFromUnderConstructedBlock && localRead && isLocalNode()) {
+ lastReadFromLocal = true;
+ LocalBlockInforCacheType &cache =
+ getBlockLocalPathInfoCache(curNode.getXferPort());
+ BlockLocalPathInfo info =
+ getBlockLocalPathInfo(cache, *curBlock);
+ assert(curBlock->getBlockId() == info.getBlock().getBlockId() &&
+ curBlock->getPoolId() == info.getBlock().getPoolId());
+ LOG(DEBUG2,
+ "%p setup local block reader for file %s from "
+ "local block %s, block offset %" PRId64
+ ", read block "
+ "length %" PRId64 " end of Block %" PRId64
+ ", local "
+ "block file path %s",
+ this, path.c_str(), curBlock->toString().c_str(), offset,
+ len, offset + len, info.getLocalBlockPath());
+
+ if (0 != access(info.getLocalMetaPath(), R_OK)) {
+ invalidCacheEntry(cache, *curBlock);
+ continue;
+ }
+
+ try {
+ blockReader = shared_ptr<BlockReader>(
+ new LocalBlockReader(info, *curBlock, offset, verify,
+ *conf, localReaderBuffer));
+ } catch (...) {
+ invalidCacheEntry(cache, *curBlock);
+ throw;
+ }
+ } else {
+ const char *clientName;
+ LOG(DEBUG2,
+ "%p setup remote block reader for file %s from "
+ "remote block %s, block offset %" PRId64
+ ""
+ ", read block length %" PRId64 " end of block %" PRId64
+ ", from node %s",
+ this, path.c_str(), curBlock->toString().c_str(), offset,
+ len, offset + len, curNode.formatAddress().c_str());
+ clientName = filesystem->getClientName();
+ lastReadFromLocal = false;
+ blockReader = shared_ptr<BlockReader>(new RemoteBlockReader(
+ *curBlock, curNode, offset, len, curBlock->getToken(),
+ clientName, verify, *conf));
+ }
+
+ break;
+ } catch (const HdfsIOException &e) {
+ lastException = current_exception();
+
+ if (lastReadFromLocal) {
+ LOG(LOG_ERROR,
+ "cannot setup block reader for Block: %s file %s "
+ "on Datanode: %s.\n%s\n"
+ "retry the same node but disable reading from local block",
+ curBlock->toString().c_str(), path.c_str(),
+ curNode.formatAddress().c_str(), GetExceptionDetail(e));
+ /*
+ * do not add node into failedNodes since we will retry the same
+ * node
+ * but
+ * disable local block reading
+ */
+ } else {
+ LOG(LOG_ERROR,
+ "cannot setup block reader for Block: %s file %s on "
+ "Datanode: %s.\n%s\nretry another node",
+ curBlock->toString().c_str(), path.c_str(),
+ curNode.formatAddress().c_str(), GetExceptionDetail(e));
+ failedNodes.push_back(curNode);
+ std::sort(failedNodes.begin(), failedNodes.end());
+ }
+ }
+ }
+}
+
+void InputStreamImpl::open(shared_ptr<FileSystemImpl> fs, const char *path,
+ bool verifyChecksum) {
+ if (NULL == path || 0 == strlen(path)) {
+ THROW(InvalidParameter, "path is invalid.");
+ }
+
+ try {
+ openInternal(fs, path, verifyChecksum);
+ } catch (...) {
+ close();
+ throw;
+ }
+}
+
+void InputStreamImpl::openInternal(shared_ptr<FileSystemImpl> fs,
+ const char *path, bool verifyChecksum) {
+ try {
+ filesystem = fs;
+ verify = verifyChecksum;
+ this->path = fs->getStandardPath(path);
+ LOG(DEBUG2, "%p, open file %s for read, verfyChecksum is %s", this,
+ this->path.c_str(), (verifyChecksum ? "true" : "false"));
+ conf = shared_ptr<SessionConfig>(new SessionConfig(fs->getConf()));
+ this->auth = RpcAuth(fs->getUserInfo(),
+ RpcAuth::ParseMethod(conf->getRpcAuthMethod()));
+ prefetchSize = conf->getDefaultBlockSize() * conf->getPrefetchSize();
+ localRead = conf->isReadFromLocal();
+ maxGetBlockInfoRetry = conf->getMaxGetBlockInfoRetry();
+ updateBlockInfos();
+ closed = false;
+ } catch (const HdfsCanceled &e) {
+ throw;
+ } catch (const FileNotFoundException &e) {
+ throw;
+ } catch (const HdfsException &e) {
+ NESTED_THROW(HdfsIOException, "InputStreamImpl: cannot open file: %s.",
+ this->path.c_str());
+ }
+}
+
+int32_t InputStreamImpl::read(char *buf, int32_t size) {
+ checkStatus();
+
+ try {
+ int64_t prvious = cursor;
+ int32_t done = readInternal(buf, size);
+ LOG(DEBUG3, "%p read file %s size is %d, offset %" PRId64
+ " done %d, "
+ "next pos %" PRId64,
+ this, path.c_str(), size, prvious, done, cursor);
+ return done;
+ } catch (const HdfsEndOfStream &e) {
+ throw;
+ } catch (...) {
+ lastError = current_exception();
+ throw;
+ }
+}
+
+int32_t InputStreamImpl::readOneBlock(char *buf, int32_t size,
+ bool shouldUpdateMetadataOnFailure) {
+ bool temporaryDisableLocalRead = false;
+
+ while (true) {
+ try {
+ /*
+ * Setup block reader here and handle failure.
+ */
+ if (!blockReader) {
+ setupBlockReader(temporaryDisableLocalRead);
+ temporaryDisableLocalRead = false;
+ }
+ } catch (const HdfsInvalidBlockToken &e) {
+ LOG(LOG_ERROR,
+ "InputStreamImpl: failed to read Block: %s file %s, \n%s, "
+ "retry after updating block informations.",
+ curBlock->toString().c_str(), path.c_str(),
+ GetExceptionDetail(e));
+ return -1;
+ } catch (const HdfsIOException &e) {
+ /*
+ * In setupBlockReader, we have tried all the replicas.
+ * We now update block informations once, and try again.
+ */
+ if (shouldUpdateMetadataOnFailure) {
+ LOG(LOG_ERROR,
+ "InputStreamImpl: failed to read Block: %s file %s, \n%s, "
+ "retry after updating block informations.",
+ curBlock->toString().c_str(), path.c_str(),
+ GetExceptionDetail(e));
+ return -1;
+ } else {
+ /*
+ * We have updated block informations and failed again.
+ */
+ throw;
+ }
+ }
+
+ /*
+ * Block reader has been setup, read from block reader.
+ */
+ try {
+ int32_t todo = size;
+ todo = todo < endOfCurBlock - cursor
+ ? todo
+ : static_cast<int32_t>(endOfCurBlock - cursor);
+ assert(blockReader);
+ todo = blockReader->read(buf, todo);
+ cursor += todo;
+ /*
+ * Exit the loop and function from here if success.
+ */
+ return todo;
+ } catch (const HdfsIOException &e) {
+ /*
+ * Failed to read from current block reader,
+ * add the current datanode to invalid node list and try again.
+ */
+ LOG(LOG_ERROR,
+ "InputStreamImpl: failed to read Block: %s file %s from "
+ "Datanode: %s, \n%s, "
+ "retry read again from another Datanode.",
+ curBlock->toString().c_str(), path.c_str(),
+ curNode.formatAddress().c_str(), GetExceptionDetail(e));
+
+ if (conf->doesNotRetryAnotherNode()) {
+ throw;
+ }
+ } catch (const ChecksumException &e) {
+ LOG(LOG_ERROR,
+ "InputStreamImpl: failed to read Block: %s file %s "
+ "from Datanode: %s, \n%s, retry read again from "
+ "another Datanode.",
+ curBlock->toString().c_str(), path.c_str(),
+ curNode.formatAddress().c_str(), GetExceptionDetail(e));
+ }
+
+ /*
+ * Successfully create the block reader but failed to read.
+ * Disable the local block reader and try the same node again.
+ */
+ if (!blockReader ||
+ dynamic_cast<LocalBlockReader *>(blockReader.get())) {
+ temporaryDisableLocalRead = true;
+ } else {
+ /*
+ * Remote block reader failed to read, try another node.
+ */
+ LOG(INFO,
+ "IntputStreamImpl: Add invalid datanode %s to failed "
+ "datanodes and try another datanode again for file %s.",
+ curNode.formatAddress().c_str(), path.c_str());
+ failedNodes.push_back(curNode);
+ std::sort(failedNodes.begin(), failedNodes.end());
+ }
+
+ blockReader.reset();
+ }
+}
+
+/**
+ * To read data from hdfs.
+ * @param buf the buffer used to filled.
+ * @param size buffer size.
+ * @return return the number of bytes filled in the buffer, it may less than
+ * size.
+ */
+int32_t InputStreamImpl::readInternal(char *buf, int32_t size) {
+ int updateMetadataOnFailure = conf->getMaxReadBlockRetry();
+
+ try {
+ do {
+ const LocatedBlock *lb = NULL;
+
+ /*
+ * Check if we have got the block information we need.
+ */
+ if (!lbs || cursor >= getFileLength() ||
+ (cursor >= endOfCurBlock && !(lb = lbs->findBlock(cursor)))) {
+ /*
+ * Get block information from namenode.
+ * Do RPC failover work in updateBlockInfos.
+ */
+ updateBlockInfos();
+
+ /*
+ * We already have the up-to-date block information,
+ * Check if we reach the end of file.
+ */
+ if (cursor >= getFileLength()) {
+ THROW(HdfsEndOfStream,
+ "InputStreamImpl: read over EOF, current position: "
+ "%" PRId64 ", read size: %d, from file: %s",
+ cursor, size, path.c_str());
+ }
+ }
+
+ /*
+ * If we reach the end of block or the block information has just
+ * updated,
+ * seek to the right block to read.
+ */
+ if (cursor >= endOfCurBlock) {
+ lb = lbs->findBlock(cursor);
+
+ if (!lb) {
+ THROW(HdfsIOException,
+ "InputStreamImpl: cannot find block information at "
+ "position: %" PRId64 " for file: %s",
+ cursor, path.c_str());
+ }
+
+ /*
+ * Seek to the right block, setup all needed variable,
+ * but do not setup block reader, setup it latter.
+ */
+ seekToBlock(*lb);
+ }
+
+ int32_t retval =
+ readOneBlock(buf, size, updateMetadataOnFailure > 0);
+
+ /*
+ * Now we have tried all replicas and failed.
+ * We will update metadata once and try again.
+ */
+ if (retval < 0) {
+ lbs.reset();
+ endOfCurBlock = 0;
+ --updateMetadataOnFailure;
+
+ try {
+ sleep_for(seconds(1));
+ } catch (...) {
+ }
+
+ continue;
+ }
+
+ return retval;
+ } while (true);
+ } catch (const HdfsCanceled &e) {
+ throw;
+ } catch (const HdfsEndOfStream &e) {
+ throw;
+ } catch (const HdfsException &e) {
+ /*
+ * wrap the underlying error and rethrow.
+ */
+ NESTED_THROW(HdfsIOException,
+ "InputStreamImpl: cannot read file: %s, from "
+ "position %" PRId64 ", size: %d.",
+ path.c_str(), cursor, size);
+ }
+}
+
+/**
+ * To read data from hdfs, block until get the given size of bytes.
+ * @param buf the buffer used to filled.
+ * @param size the number of bytes to be read.
+ */
+void InputStreamImpl::readFully(char *buf, int64_t size) {
+ LOG(DEBUG3, "readFully file %s size is %" PRId64 ", offset %" PRId64,
+ path.c_str(), size, cursor);
+ checkStatus();
+
+ try {
+ return readFullyInternal(buf, size);
+ } catch (const HdfsEndOfStream &e) {
+ throw;
+ } catch (...) {
+ lastError = current_exception();
+ throw;
+ }
+}
+
+void InputStreamImpl::readFullyInternal(char *buf, int64_t size) {
+ int32_t done;
+ int64_t pos = cursor, todo = size;
+
+ try {
+ while (todo > 0) {
+ done = todo < std::numeric_limits<int32_t>::max()
+ ? static_cast<int32_t>(todo)
+ : std::numeric_limits<int32_t>::max();
+ done = readInternal(buf + (size - todo), done);
+ todo -= done;
+ }
+ } catch (const HdfsCanceled &e) {
+ throw;
+ } catch (const HdfsEndOfStream &e) {
+ THROW(HdfsEndOfStream,
+ "InputStreamImpl: read over EOF, current position: %" PRId64
+ ", read size: %" PRId64 ", from file: %s",
+ pos, size, path.c_str());
+ } catch (const HdfsException &e) {
+ NESTED_THROW(HdfsIOException,
+ "InputStreamImpl: cannot read fully from file: %s, "
+ "from position %" PRId64 ", size: %" PRId64 ".",
+ path.c_str(), pos, size);
+ }
+}
+
+int64_t InputStreamImpl::available() {
+ checkStatus();
+
+ try {
+ if (blockReader) {
+ return blockReader->available();
+ }
+ } catch (...) {
+ lastError = current_exception();
+ throw;
+ }
+
+ return 0;
+}
+
+/**
+ * To move the file point to the given position.
+ * @param size the given position.
+ */
+void InputStreamImpl::seek(int64_t pos) {
+ LOG(DEBUG2, "%p seek file %s to %" PRId64 ", offset %" PRId64, this,
+ path.c_str(), pos, cursor);
+ checkStatus();
+
+ try {
+ seekInternal(pos);
+ } catch (...) {
+ lastError = current_exception();
+ throw;
+ }
+}
+
+void InputStreamImpl::seekInternal(int64_t pos) {
+ if (cursor == pos) {
+ return;
+ }
+
+ if (!lbs || pos > getFileLength()) {
+ updateBlockInfos();
+
+ if (pos > getFileLength()) {
+ THROW(HdfsEndOfStream,
+ "InputStreamImpl: seek over EOF, current position: %" PRId64
+ ", seek target: %" PRId64 ", in file: %s",
+ cursor, pos, path.c_str());
+ }
+ }
+
+ try {
+ if (blockReader && pos > cursor && pos < endOfCurBlock) {
+ blockReader->skip(pos - cursor);
+ cursor = pos;
+ return;
+ }
+ } catch (const HdfsIOException &e) {
+ LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64
+ " bytes in current block reader for file %s\n%s",
+ pos - cursor, path.c_str(), GetExceptionDetail(e));
+ LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64
+ " for file %s",
+ pos, path.c_str());
+ } catch (const ChecksumException &e) {
+ LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64
+ " bytes in current block reader for file %s\n%s",
+ pos - cursor, path.c_str(), GetExceptionDetail(e));
+ LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64
+ " for file %s",
+ pos, path.c_str());
+ }
+
+ /**
+ * the seek target exceed the current block or skip failed in current block
+ * reader.
+ * reset current block reader and set the cursor to the target position to
+ * seek.
+ */
+ endOfCurBlock = 0;
+ blockReader.reset();
+ cursor = pos;
+}
+
+/**
+ * To get the current file point position.
+ * @return the position of current file point.
+ */
+int64_t InputStreamImpl::tell() {
+ checkStatus();
+ LOG(DEBUG2, "tell file %s at %" PRId64, path.c_str(), cursor);
+ return cursor;
+}
+
+/**
+ * Close the stream.
+ */
+void InputStreamImpl::close() {
+ LOG(DEBUG2, "%p close file %s for read", this, path.c_str());
+ closed = true;
+ localRead = true;
+ readFromUnderConstructedBlock = false;
+ verify = true;
+ filesystem.reset();
+ cursor = 0;
+ endOfCurBlock = 0;
+ lastBlockBeingWrittenLength = 0;
+ prefetchSize = 0;
+ blockReader.reset();
+ curBlock.reset();
+ lbs.reset();
+ conf.reset();
+ failedNodes.clear();
+ path.clear();
+ localReaderBuffer.resize(0);
+ lastError = exception_ptr();
+}
+
+std::string InputStreamImpl::toString() {
+ if (path.empty()) {
+ return std::string("InputStream for path ") + path;
+ } else {
+ return std::string("InputStream (not opened)");
+ }
+}
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h
index 8723344..5202a33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/InputStreamImpl.h
@@ -19,13 +19,10 @@
#ifndef _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_
#define _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_
-#include "platform.h"
-
#include "BlockReader.h"
#include "ExceptionInternal.h"
#include "FileSystem.h"
#include "Hash.h"
-#include "InputStreamInter.h"
#include "LruMap.h"
#include "SessionConfig.h"
#include "SharedPtr.h"
@@ -45,14 +42,16 @@ namespace hdfs {
namespace internal {
typedef std::pair<int64_t, std::string> LocalBlockInforCacheKey;
-typedef LruMap<LocalBlockInforCacheKey, BlockLocalPathInfo> LocalBlockInforCacheType;
+typedef LruMap<LocalBlockInforCacheKey, BlockLocalPathInfo>
+ LocalBlockInforCacheType;
/**
* A input stream used read data from hdfs.
*/
-class InputStreamImpl: public InputStreamInter {
+class InputStreamImpl {
public:
InputStreamImpl();
+
~InputStreamImpl();
/**
@@ -61,22 +60,24 @@ public:
* @param path the file to be read.
* @param verifyChecksum verify the checksum.
*/
- void open(shared_ptr<FileSystemInter> fs, const char * path, bool verifyChecksum);
+ void open(shared_ptr<FileSystemImpl> fs, const char *path,
+ bool verifyChecksum);
/**
* To read data from hdfs.
* @param buf the buffer used to filled.
* @param size buffer size.
- * @return return the number of bytes filled in the buffer, it may less than size.
+ * @return return the number of bytes filled in the buffer, it may less than
+ * size.
*/
- int32_t read(char * buf, int32_t size);
+ int32_t read(char *buf, int32_t size);
/**
* To read data from hdfs, block until get the given size of bytes.
* @param buf the buffer used to filled.
* @param size the number of bytes to be read.
*/
- void readFully(char * buf, int64_t size);
+ void readFully(char *buf, int64_t size);
int64_t available();
@@ -100,23 +101,24 @@ public:
std::string toString();
private:
- BlockLocalPathInfo getBlockLocalPathInfo(LocalBlockInforCacheType & cache,
- const LocatedBlock & b);
+ BlockLocalPathInfo getBlockLocalPathInfo(LocalBlockInforCacheType &cache,
+ const LocatedBlock &b);
bool choseBestNode();
bool isLocalNode();
- int32_t readInternal(char * buf, int32_t size);
- int32_t readOneBlock(char * buf, int32_t size, bool shouldUpdateMetadataOnFailure);
+ int32_t readInternal(char *buf, int32_t size);
+ int32_t readOneBlock(char *buf, int32_t size,
+ bool shouldUpdateMetadataOnFailure);
int64_t getFileLength();
- int64_t readBlockLength(const LocatedBlock & b);
- LocalBlockInforCacheType & getBlockLocalPathInfoCache(uint32_t port);
+ int64_t readBlockLength(const LocatedBlock &b);
+ LocalBlockInforCacheType &getBlockLocalPathInfoCache(uint32_t port);
void checkStatus();
- void invalidCacheEntry(LocalBlockInforCacheType & cache,
- const LocatedBlock & b);
- void openInternal(shared_ptr<FileSystemInter> fs, const char * path,
+ void invalidCacheEntry(LocalBlockInforCacheType &cache,
+ const LocatedBlock &b);
+ void openInternal(shared_ptr<FileSystemImpl> fs, const char *path,
bool verifyChecksum);
- void readFullyInternal(char * buf, int64_t size);
+ void readFullyInternal(char *buf, int64_t size);
void seekInternal(int64_t pos);
- void seekToBlock(const LocatedBlock & lb);
+ void seekToBlock(const LocatedBlock &lb);
void setupBlockReader(bool temporaryDisableLocalRead);
void updateBlockInfos();
@@ -135,7 +137,7 @@ private:
int64_t prefetchSize;
RpcAuth auth;
shared_ptr<BlockReader> blockReader;
- shared_ptr<FileSystemInter> filesystem;
+ shared_ptr<FileSystemImpl> filesystem;
shared_ptr<LocatedBlock> curBlock;
shared_ptr<LocatedBlocks> lbs;
shared_ptr<SessionConfig> conf;
@@ -144,28 +146,31 @@ private:
std::vector<char> localReaderBuffer;
static mutex MutLocalBlockInforCache;
- static unordered_map<uint32_t, shared_ptr<LocalBlockInforCacheType> > LocalBlockInforCache;
-#ifdef MOCK
+ static unordered_map<uint32_t, shared_ptr<LocalBlockInforCacheType>>
+ LocalBlockInforCache;
+
private:
- hdfs::mock::TestDatanodeStub * stub;
+ InputStreamImpl(const InputStreamImpl &other);
+ InputStreamImpl &operator=(const InputStreamImpl &other);
+
+#ifdef MOCK
+ hdfs::mock::TestDatanodeStub *stub;
#endif
};
-
}
}
#ifdef NEED_BOOST
namespace boost {
-template<>
+template <>
struct hash<hdfs::internal::LocalBlockInforCacheKey> {
std::size_t operator()(
- const hdfs::internal::LocalBlockInforCacheKey & key) const {
+ const hdfs::internal::LocalBlockInforCacheKey &key) const {
size_t values[] = {hdfs::internal::Int64Hasher(key.first),
- hdfs::internal::StringHasher(key.second)
- };
- return hdfs::internal::CombineHasher(values,
- sizeof(values) / sizeof(values[0]));
+ hdfs::internal::StringHasher(key.second)};
+ return hdfs::internal::CombineHasher(
+ values, sizeof(values) / sizeof(values[0]));
}
};
}
@@ -173,15 +178,14 @@ struct hash<hdfs::internal::LocalBlockInforCacheKey> {
#else
namespace std {
-template<>
+template <>
struct hash<hdfs::internal::LocalBlockInforCacheKey> {
std::size_t operator()(
- const hdfs::internal::LocalBlockInforCacheKey & key) const {
- size_t values[] = { hdfs::internal::Int64Hasher(key.first),
- hdfs::internal::StringHasher(key.second)
- };
- return hdfs::internal::CombineHasher(values,
- sizeof(values) / sizeof(values[0]));
+ const hdfs::internal::LocalBlockInforCacheKey &key) const {
+ size_t values[] = {hdfs::internal::Int64Hasher(key.first),
+ hdfs::internal::StringHasher(key.second)};
+ return hdfs::internal::CombineHasher(
+ values, sizeof(values) / sizeof(values[0]));
}
};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h
index 6118403..d26745d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/LocalBlockReader.h
@@ -32,11 +32,11 @@
namespace hdfs {
namespace internal {
-class LocalBlockReader: public BlockReader {
+class LocalBlockReader : public BlockReader {
public:
- LocalBlockReader(const BlockLocalPathInfo & info,
- const ExtendedBlock & block, int64_t offset, bool verify,
- SessionConfig & conf, std::vector<char> & buffer);
+ LocalBlockReader(const BlockLocalPathInfo &info, const ExtendedBlock &block,
+ int64_t offset, bool verify, SessionConfig &conf,
+ std::vector<char> &buffer);
~LocalBlockReader();
@@ -55,7 +55,7 @@ public:
* @return return the number of bytes filled in the buffer,
* it may less than size. Return 0 if reach the end of block.
*/
- virtual int32_t read(char * buf, int32_t size);
+ virtual int32_t read(char *buf, int32_t size);
/**
* Move the cursor forward len bytes.
@@ -64,34 +64,36 @@ public:
virtual void skip(int64_t len);
private:
+ LocalBlockReader(const LocalBlockReader &other);
+ LocalBlockReader &operator=(const LocalBlockReader &other);
+
/**
* Fill buffer and verify checksum.
* @param bufferSize The size of buffer.
*/
void readAndVerify(int32_t bufferSize);
- int32_t readInternal(char * buf, int32_t len);
+ int32_t readInternal(char *buf, int32_t len);
private:
- bool verify; //verify checksum or not.
+ bool verify; // verify checksum or not.
const char *pbuffer;
const char *pMetaBuffer;
const ExtendedBlock █
int checksumSize;
int chunkSize;
int localBufferSize;
- int position; //point in buffer.
- int size; //data size in buffer.
- int64_t cursor; //point in block.
- int64_t length; //data size of block.
+ int position; // point in buffer.
+ int size; // data size in buffer.
+ int64_t cursor; // point in block.
+ int64_t length; // data size of block.
shared_ptr<Checksum> checksum;
shared_ptr<FileWrapper> dataFd;
shared_ptr<FileWrapper> metaFd;
std::string dataFilePath;
std::string metaFilePath;
- std::vector<char> & buffer;
+ std::vector<char> &buffer;
std::vector<char> metaBuffer;
};
-
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h
index 7598344..dd69b8b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Packet.h
@@ -29,11 +29,10 @@ namespace internal {
class ConstPacketBuffer {
public:
- ConstPacketBuffer(const char * buf, int size) :
- buffer(buf), size(size) {
+ ConstPacketBuffer(const char *buf, int size) : buffer(buf), size(size) {
}
- const char * getBuffer() const {
+ const char *getBuffer() const {
return buffer;
}
@@ -42,7 +41,7 @@ public:
}
private:
- const char * buffer;
+ const char *buffer;
const int size;
};
@@ -65,13 +64,15 @@ public:
/**
* create a new packet
*/
- Packet(int pktSize, int chunksPerPkt, int64_t offsetInBlock, int64_t seqno, int checksumSize);
+ Packet(int pktSize, int chunksPerPkt, int64_t offsetInBlock, int64_t seqno,
+ int checksumSize);
- void reset(int pktSize, int chunksPerPkt, int64_t offsetInBlock, int64_t seqno, int checksumSize);
+ void reset(int pktSize, int chunksPerPkt, int64_t offsetInBlock,
+ int64_t seqno, int checksumSize);
void addChecksum(uint32_t checksum);
- void addData(const char * buf, int size);
+ void addData(const char *buf, int size);
void setSyncFlag(bool sync);
@@ -102,21 +103,20 @@ public:
}
private:
- bool lastPacketInBlock; // is this the last packet in block
- bool syncBlock; // sync block to disk?
+ bool lastPacketInBlock; // is this the last packet in block
+ bool syncBlock; // sync block to disk?
int checksumPos;
int checksumSize;
int checksumStart;
int dataPos;
int dataStart;
int headerStart;
- int maxChunks; // max chunks in packet
- int numChunks; // number of chunks currently in packet
- int64_t offsetInBlock; // offset in block
- int64_t seqno; // sequence number of packet in block
+ int maxChunks; // max chunks in packet
+ int numChunks; // number of chunks currently in packet
+ int64_t offsetInBlock; // offset in block
+ int64_t seqno; // sequence number of packet in block
std::vector<char> buffer;
};
-
}
}
#endif /* _HDFS_LIBHDFS3_CLIENT_PACKET_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h
index f8447b8..6d46a7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/PacketHeader.h
@@ -28,6 +28,8 @@ namespace internal {
class PacketHeader {
public:
+ static int GetPkgHeaderSize();
+ static int CalcPkgHeaderSize();
PacketHeader();
PacketHeader(int packetLen, int64_t offsetInBlock, int64_t seqno,
bool lastPacketInBlock, int dataLen);
@@ -37,24 +39,19 @@ public:
int getPacketLen();
int64_t getOffsetInBlock();
int64_t getSeqno();
- void readFields(const char * buf, size_t size);
+ void readFields(const char *buf, size_t size);
+
/**
* Write the header into the buffer.
* This requires that PKT_HEADER_LEN bytes are available.
*/
- void writeInBuffer(char * buf, size_t size);
-
-public:
- static int GetPkgHeaderSize();
- static int CalcPkgHeaderSize();
+ void writeInBuffer(char *buf, size_t size);
private:
static int PkgHeaderSize;
-private:
int32_t packetLen;
PacketHeaderProto proto;
};
-
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
index 548118b..2a29acb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
@@ -35,7 +35,7 @@
namespace hdfs {
namespace internal {
-class RemoteBlockReader: public BlockReader {
+class RemoteBlockReader : public BlockReader {
public:
RemoteBlockReader(const ExtendedBlock &eb, DatanodeInfo &datanode,
int64_t start, int64_t len, const Token &token,
@@ -65,6 +65,8 @@ public:
virtual void skip(int64_t len);
private:
+ RemoteBlockReader(const RemoteBlockReader &other);
+ RemoteBlockReader &operator=(RemoteBlockReader &other);
bool readTrailingEmptyPacket();
shared_ptr<PacketHeader> readPacketHeader();
void checkResponse();
@@ -72,20 +74,19 @@ private:
void sendStatus();
void verifyChecksum(int chunks);
-private:
- bool verify; //verify checksum or not.
+ bool verify; // verify checksum or not.
DatanodeInfo &datanode;
const ExtendedBlock &binfo;
int checksumSize;
int chunkSize;
int connTimeout;
- int position; //point in buffer.
+ int position; // point in buffer.
int readTimeout;
- int size; //data size in buffer.
+ int size; // data size in buffer.
int writeTimeout;
- int64_t cursor; //point in block.
- int64_t endOffset; //offset in block requested to read to.
- int64_t lastSeqNo; //segno of the last chunk received
+ int64_t cursor; // point in block.
+ int64_t endOffset; // offset in block requested to read to.
+ int64_t lastSeqNo; // segno of the last chunk received
shared_ptr<BufferedSocketReader> in;
shared_ptr<Checksum> checksum;
shared_ptr<DataTransferProtocol> sender;
@@ -93,7 +94,6 @@ private:
shared_ptr<Socket> sock;
std::vector<char> buffer;
};
-
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/TokenInternal.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/TokenInternal.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/TokenInternal.h
new file mode 100644
index 0000000..e00baa8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/TokenInternal.h
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_
+#define _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_
+
+#include "Hash.h"
+#include "Token.h"
+
+HDFS_HASH_DEFINE(::hdfs::internal::Token);
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_TOKENINTERNAL_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.cc
new file mode 100644
index 0000000..3e1e841
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.cc
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"){} you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Config.h"
+#include "ConfigImpl.h"
+#include "XmlConfigParser.h"
+#include "StatusInternal.h"
+
+using namespace hdfs::internal;
+
+namespace hdfs {
+
+Config Config::CreateFromXmlFile(const std::string &path) {
+ return Config(new ConfigImpl(XmlConfigParser(path.c_str()).getKeyValue()));
+}
+
+Config::Config() : impl(new ConfigImpl) {
+}
+
+Config::Config(const Config &other) {
+ impl = new ConfigImpl(*other.impl);
+}
+
+Config::Config(ConfigImpl *impl) : impl(impl) {
+}
+
+Config &Config::operator=(const Config &other) {
+ if (this == &other) {
+ return *this;
+ }
+
+ ConfigImpl *temp = impl;
+ impl = new ConfigImpl(*other.impl);
+ delete temp;
+ return *this;
+}
+
+bool Config::operator==(const Config &other) const {
+ if (this == &other) {
+ return true;
+ }
+
+ return *impl == *other.impl;
+}
+
+Config::~Config() {
+ delete impl;
+}
+
+Status Config::getString(const std::string &key, std::string *output) const {
+ CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+ try {
+ *output = impl->getString(key.c_str());
+ } catch (...) {
+ return CreateStatusFromException(current_exception());
+ }
+
+ return Status::OK();
+}
+
+Status Config::getString(const std::string &key, const std::string &def,
+ std::string *output) const {
+ CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+ try {
+ *output = impl->getString(key.c_str(), def.c_str());
+ } catch (...) {
+ return CreateStatusFromException(current_exception());
+ }
+
+ return Status::OK();
+}
+
+Status Config::getInt64(const std::string &key, std::string *output) const {
+ CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+ try {
+ *output = impl->getInt64(key.c_str());
+ } catch (...) {
+ return CreateStatusFromException(current_exception());
+ }
+
+ return Status::OK();
+}
+
+Status Config::getInt64(const std::string &key, int64_t def,
+ std::string *output) const {
+ CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+ try {
+ *output = impl->getInt64(key.c_str(), def);
+ } catch (...) {
+ return CreateStatusFromException(current_exception());
+ }
+
+ return Status::OK();
+}
+
+Status Config::getInt32(const std::string &key, std::string *output) const {
+ CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+ try {
+ *output = impl->getInt32(key.c_str());
+ } catch (...) {
+ return CreateStatusFromException(current_exception());
+ }
+
+ return Status::OK();
+}
+
+Status Config::getInt32(const std::string &key, int32_t def,
+ std::string *output) const {
+ CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+ try {
+ *output = impl->getInt32(key.c_str(), def);
+ } catch (...) {
+ return CreateStatusFromException(current_exception());
+ }
+
+ return Status::OK();
+}
+
+Status Config::getDouble(const std::string &key, std::string *output) const {
+ CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+ try {
+ *output = impl->getDouble(key.c_str());
+ } catch (...) {
+ return CreateStatusFromException(current_exception());
+ }
+
+ return Status::OK();
+}
+
+Status Config::getDouble(const std::string &key, double def,
+ std::string *output) const {
+ CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+ try {
+ *output = impl->getDouble(key.c_str(), def);
+ } catch (...) {
+ return CreateStatusFromException(current_exception());
+ }
+
+ return Status::OK();
+}
+
+Status Config::getBool(const std::string &key, std::string *output) const {
+ CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+ try {
+ *output = impl->getBool(key.c_str());
+ } catch (...) {
+ return CreateStatusFromException(current_exception());
+ }
+
+ return Status::OK();
+}
+
+Status Config::getBool(const std::string &key, bool def,
+ std::string *output) const {
+ CHECK_PARAMETER(NULL != output, EINVAL, "invalid parameter \"output\"");
+
+ try {
+ *output = impl->getBool(key.c_str(), def);
+ } catch (...) {
+ return CreateStatusFromException(current_exception());
+ }
+
+ return Status::OK();
+}
+
+void Config::set(const std::string &key, const std::string &value) {
+ impl->set(key.c_str(), value);
+}
+
+void Config::set(const std::string &key, int32_t value) {
+ impl->set(key.c_str(), value);
+}
+
+void Config::set(const std::string &key, int64_t value) {
+ impl->set(key.c_str(), value);
+}
+
+void Config::set(const std::string &key, double value) {
+ impl->set(key.c_str(), value);
+}
+
+void Config::set(const std::string &key, bool value) {
+ impl->set(key.c_str(), value);
+}
+
+size_t Config::hash_value() const {
+ return impl->hash_value();
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.h
new file mode 100644
index 0000000..4359e3f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/Config.h
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_COMMON_CONFIG_H_
+#define _HDFS_LIBHDFS3_COMMON_CONFIG_H_
+
+#include "Status.h"
+
+#include <stdint.h>
+#include <string>
+#include <map>
+
+namespace hdfs {
+
+class FileSystem;
+class NamenodeInfo;
+
+namespace internal {
+class ConfigImpl;
+class FileSystemImpl;
+}
+
+/**
+ * A configure file parser.
+ */
+class Config {
+public:
+ /**
+ * Create an instance from a XML file
+ * @param path the path of the configure file.
+ */
+ static Config CreateFromXmlFile(const std::string &path);
+
+ /**
+ * Construct a empty Config instance.
+ */
+ Config();
+
+ /**
+ * Copy constructor
+ */
+ Config(const Config &other);
+
+ /**
+ * Assignment operator.
+ */
+ Config &operator=(const Config &other);
+
+ /**
+ * Operator equal
+ */
+ bool operator==(const Config &other) const;
+
+ /**
+ * Destroy this instance
+ */
+ ~Config();
+
+ /**
+ * Get a string with given configure key.
+ * @param key The key of the configure item.
+ * @param output the pointer of the output parameter.
+ * @return the result status of this operation
+ */
+ Status getString(const std::string &key, std::string *output) const;
+
+ /**
+ * Get a string with given configure key.
+ * Return the default value def if key is not found.
+ * @param key The key of the configure item.
+ * @param def The defalut value.
+ * @param output the pointer of the output parameter.
+ * @return the result status of this operation
+ */
+ Status getString(const std::string &key, const std::string &def,
+ std::string *output) const;
+
+ /**
+ * Get a 64 bit integer with given configure key.
+ * @param key The key of the configure item.
+ * @param output the pointer of the output parameter.
+ * @return the result status of this operation
+ */
+ Status getInt64(const std::string &key, std::string *output) const;
+
+ /**
+ * Get a 64 bit integer with given configure key.
+ * Return the default value def if key is not found.
+ * @param key The key of the configure item.
+ * @param def The defalut value.
+ * @param output the pointer of the output parameter.
+ * @return the result status of this operation
+ */
+ Status getInt64(const std::string &key, int64_t def,
+ std::string *output) const;
+
+ /**
+ * Get a 32 bit integer with given configure key.
+ * @param key The key of the configure item.
+ * @param output the pointer of the output parameter.
+ * @return the result status of this operation
+ */
+ Status getInt32(const std::string &key, std::string *output) const;
+
+ /**
+ * Get a 32 bit integer with given configure key.
+ * Return the default value def if key is not found.
+ * @param key The key of the configure item.
+ * @param def The defalut value.
+ * @param output the pointer of the output parameter.
+ * @return the result status of this operation
+ */
+ Status getInt32(const std::string &key, int32_t def,
+ std::string *output) const;
+
+ /**
+ * Get a double with given configure key.
+ * @param key The key of the configure item.
+ * @param output the pointer of the output parameter.
+ * @return the result status of this operation
+ */
+ Status getDouble(const std::string &key, std::string *output) const;
+
+ /**
+ * Get a double with given configure key.
+ * Return the default value def if key is not found.
+ * @param key The key of the configure item.
+ * @param def The defalut value.
+ * @param output the pointer of the output parameter.
+ * @return the result status of this operation
+ */
+ Status getDouble(const std::string &key, double def,
+ std::string *output) const;
+
+ /**
+ * Get a boolean with given configure key.
+ * @param key The key of the configure item.
+ * @param output the pointer of the output parameter.
+ * @return the result status of this operation
+ */
+ Status getBool(const std::string &key, std::string *output) const;
+
+ /**
+ * Get a boolean with given configure key.
+ * Return the default value def if key is not found.
+ * @param key The key of the configure item.
+ * @param def The default value.
+ * @param output the pointer of the output parameter.
+ * @return the result status of this operation
+ */
+ Status getBool(const std::string &key, bool def, std::string *output) const;
+
+ /**
+ * Set a configure item
+ * @param key The key will set.
+ * @param value The value will be set to.
+ */
+ void set(const std::string &key, const std::string &value);
+
+ /**
+ * Set a configure item
+ * @param key The key will set.
+ * @param value The value will be set to.
+ */
+ void set(const std::string &key, int32_t value);
+
+ /**
+ * Set a configure item
+ * @param key The key will set.
+ * @param value The value will be set to.
+ */
+ void set(const std::string &key, int64_t value);
+
+ /**
+ * Set a configure item
+ * @param key The key will set.
+ * @param value The value will be set to.
+ */
+ void set(const std::string &key, double value);
+
+ /**
+ * Set a configure item
+ * @param key The key will set.
+ * @param value The value will be set to.
+ */
+ void set(const std::string &key, bool value);
+
+ /**
+ * Get the hash value of this object
+ * @return The hash value
+ */
+ size_t hash_value() const;
+
+private:
+ Config(hdfs::internal::ConfigImpl *impl);
+ hdfs::internal::ConfigImpl *impl;
+ friend class hdfs::FileSystem;
+ friend class hdfs::internal::FileSystemImpl;
+ friend class hdfs::NamenodeInfo;
+};
+}
+
+#endif /* _HDFS_LIBHDFS3_COMMON_CONFIG_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.cc
new file mode 100644
index 0000000..7bfb3ec
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.cc
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "Hash.h"
+#include "ConfigImpl.h"
+
+#include <cassert>
+#include <errno.h>
+#include <fstream>
+#include <limits>
+#include <string.h>
+#include <unistd.h>
+#include <vector>
+
+using std::map;
+using std::string;
+using std::vector;
+
+namespace hdfs {
+namespace internal {
+
+typedef map<string, string>::const_iterator Iterator;
+typedef map<string, string> Map;
+
+static int32_t StrToInt32(const char *str) {
+ long retval;
+ char *end = NULL;
+ errno = 0;
+ retval = strtol(str, &end, 0);
+
+ if (EINVAL == errno || 0 != *end) {
+ THROW(HdfsBadNumFoumat, "Invalid int32_t type: %s", str);
+ }
+
+ if (ERANGE == errno || retval > std::numeric_limits<int32_t>::max() ||
+ retval < std::numeric_limits<int32_t>::min()) {
+ THROW(HdfsBadNumFoumat, "Underflow/Overflow int32_t type: %s", str);
+ }
+
+ return retval;
+}
+
+static int64_t StrToInt64(const char *str) {
+ long long retval;
+ char *end = NULL;
+ errno = 0;
+ retval = strtoll(str, &end, 0);
+
+ if (EINVAL == errno || 0 != *end) {
+ THROW(HdfsBadNumFoumat, "Invalid int64_t type: %s", str);
+ }
+
+ if (ERANGE == errno || retval > std::numeric_limits<int64_t>::max() ||
+ retval < std::numeric_limits<int64_t>::min()) {
+ THROW(HdfsBadNumFoumat, "Underflow/Overflow int64_t type: %s", str);
+ }
+
+ return retval;
+}
+
+static bool StrToBool(const char *str) {
+ bool retval = false;
+
+ if (!strcasecmp(str, "true") || !strcmp(str, "1")) {
+ retval = true;
+ } else if (!strcasecmp(str, "false") || !strcmp(str, "0")) {
+ retval = false;
+ } else {
+ THROW(HdfsBadBoolFoumat, "Invalid bool type: %s", str);
+ }
+
+ return retval;
+}
+
+static double StrToDouble(const char *str) {
+ double retval;
+ char *end = NULL;
+ errno = 0;
+ retval = strtod(str, &end);
+
+ if (EINVAL == errno || 0 != *end) {
+ THROW(HdfsBadNumFoumat, "Invalid double type: %s", str);
+ }
+
+ if (ERANGE == errno || retval > std::numeric_limits<double>::max() ||
+ retval < std::numeric_limits<double>::min()) {
+ THROW(HdfsBadNumFoumat, "Underflow/Overflow int64_t type: %s", str);
+ }
+
+ return retval;
+}
+
+ConfigImpl::ConfigImpl(const Map &kv) : kv(kv) {
+}
+
+const char *ConfigImpl::getString(const std::string &key) const {
+ Iterator it = kv.find(key.c_str());
+
+ if (kv.end() == it) {
+ THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str());
+ }
+
+ return it->second.c_str();
+}
+
+const char *ConfigImpl::getString(const std::string &key,
+ const std::string &def) const {
+ Iterator it = kv.find(key.c_str());
+
+ if (kv.end() == it) {
+ return def.c_str();
+ } else {
+ return it->second.c_str();
+ }
+}
+
+int64_t ConfigImpl::getInt64(const std::string &key) const {
+ int64_t retval;
+ Iterator it = kv.find(key.c_str());
+
+ if (kv.end() == it) {
+ THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str());
+ }
+
+ try {
+ retval = StrToInt64(it->second.c_str());
+ } catch (const HdfsBadNumFoumat &e) {
+ NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+ key.c_str());
+ }
+
+ return retval;
+}
+
+int64_t ConfigImpl::getInt64(const std::string &key, int64_t def) const {
+ int64_t retval;
+ Iterator it = kv.find(key.c_str());
+
+ if (kv.end() == it) {
+ return def;
+ }
+
+ try {
+ retval = StrToInt64(it->second.c_str());
+ } catch (const HdfsBadNumFoumat &e) {
+ NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+ key.c_str());
+ }
+
+ return retval;
+}
+
+int32_t ConfigImpl::getInt32(const std::string &key) const {
+ int32_t retval;
+ Iterator it = kv.find(key.c_str());
+
+ if (kv.end() == it) {
+ THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str());
+ }
+
+ try {
+ retval = StrToInt32(it->second.c_str());
+ } catch (const HdfsBadNumFoumat &e) {
+ NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+ key.c_str());
+ }
+
+ return retval;
+}
+
+int32_t ConfigImpl::getInt32(const std::string &key, int32_t def) const {
+ int32_t retval;
+ Iterator it = kv.find(key.c_str());
+
+ if (kv.end() == it) {
+ return def;
+ }
+
+ try {
+ retval = StrToInt32(it->second.c_str());
+ } catch (const HdfsBadNumFoumat &e) {
+ NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+ key.c_str());
+ }
+
+ return retval;
+}
+
+double ConfigImpl::getDouble(const std::string &key) const {
+ double retval;
+ Iterator it = kv.find(key.c_str());
+
+ if (kv.end() == it) {
+ THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str());
+ }
+
+ try {
+ retval = StrToDouble(it->second.c_str());
+ } catch (const HdfsBadNumFoumat &e) {
+ NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+ key.c_str());
+ }
+
+ return retval;
+}
+
+double ConfigImpl::getDouble(const std::string &key, double def) const {
+ double retval;
+ Iterator it = kv.find(key.c_str());
+
+ if (kv.end() == it) {
+ return def;
+ }
+
+ try {
+ retval = StrToDouble(it->second.c_str());
+ } catch (const HdfsBadNumFoumat &e) {
+ NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+ key.c_str());
+ }
+
+ return retval;
+}
+
+bool ConfigImpl::getBool(const std::string &key) const {
+ bool retval;
+ Iterator it = kv.find(key.c_str());
+
+ if (kv.end() == it) {
+ THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found", key.c_str());
+ }
+
+ try {
+ retval = StrToBool(it->second.c_str());
+ } catch (const HdfsBadBoolFoumat &e) {
+ NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+ key.c_str());
+ }
+
+ return retval;
+}
+
+bool ConfigImpl::getBool(const std::string &key, bool def) const {
+ bool retval;
+ Iterator it = kv.find(key.c_str());
+
+ if (kv.end() == it) {
+ return def;
+ }
+
+ try {
+ retval = StrToBool(it->second.c_str());
+ } catch (const HdfsBadNumFoumat &e) {
+ NESTED_THROW(HdfsConfigNotFound, "ConfigImpl key: %s not found",
+ key.c_str());
+ }
+
+ return retval;
+}
+
+size_t ConfigImpl::hash_value() const {
+ vector<size_t> values;
+ map<string, string>::const_iterator s, e;
+ e = kv.end();
+
+ for (s = kv.begin(); s != e; ++s) {
+ values.push_back(StringHasher(s->first));
+ values.push_back(StringHasher(s->second));
+ }
+
+ return CombineHasher(&values[0], values.size());
+}
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e559ce04/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.h
new file mode 100644
index 0000000..5d058dd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/ConfigImpl.h
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_COMMON_XMLCONFIGIMPL_H_
+#define _HDFS_LIBHDFS3_COMMON_XMLCONFIGIMPL_H_
+
+#include <stdint.h>
+#include <string>
+#include <sstream>
+#include <map>
+
+namespace hdfs {
+namespace internal {
+
+/**
+ * A configure file parser.
+ */
+class ConfigImpl {
+public:
+ ConfigImpl() {
+ }
+
+ /**
+ * Construct a empty Config instance.
+ */
+ ConfigImpl(const std::map<std::string, std::string> &kv);
+
+ /**
+ * Operator equal
+ */
+ bool operator==(const ConfigImpl &other) const {
+ if (this == &other) {
+ return true;
+ }
+
+ return this->kv == other.kv;
+ }
+
+ /**
+ * Get a string with given configure key.
+ * @param key The key of the configure item.
+ * @return The value of configure item.
+ * @throw HdfsConfigNotFound
+ */
+ const char *getString(const std::string &key) const;
+
+ /**
+ * Get a string with given configure key.
+ * Return the default value def if key is not found.
+ * @param key The key of the configure item.
+ * @param def The defalut value.
+ * @return The value of configure item.
+ */
+ const char *getString(const std::string &key, const std::string &def) const;
+
+ /**
+ * Get a 64 bit integer with given configure key.
+ * @param key The key of the configure item.
+ * @return The value of configure item.
+ * @throw HdfsConfigNotFound
+ */
+ int64_t getInt64(const std::string &key) const;
+
+ /**
+ * Get a 64 bit integer with given configure key.
+ * Return the default value def if key is not found.
+ * @param key The key of the configure item.
+ * @param def The defalut value.
+ * @return The value of configure item.
+ */
+ int64_t getInt64(const std::string &key, int64_t def) const;
+
+ /**
+ * Get a 32 bit integer with given configure key.
+ * @param key The key of the configure item.
+ * @return The value of configure item.
+ * @throw HdfsConfigNotFound
+ */
+ int32_t getInt32(const std::string &key) const;
+
+ /**
+ * Get a 32 bit integer with given configure key.
+ * Return the default value def if key is not found.
+ * @param key The key of the configure item.
+ * @param def The defalut value.
+ * @return The value of configure item.
+ */
+ int32_t getInt32(const std::string &key, int32_t def) const;
+
+ /**
+ * Get a double with given configure key.
+ * @param key The key of the configure item.
+ * @return The value of configure item.
+ * @throw HdfsConfigNotFound
+ */
+ double getDouble(const std::string &key) const;
+
+ /**
+ * Get a double with given configure key.
+ * Return the default value def if key is not found.
+ * @param key The key of the configure item.
+ * @param def The defalut value.
+ * @return The value of configure item.
+ */
+ double getDouble(const std::string &key, double def) const;
+
+ /**
+ * Get a boolean with given configure key.
+ * @param key The key of the configure item.
+ * @return The value of configure item.
+ * @throw HdfsConfigNotFound
+ */
+ bool getBool(const std::string &key) const;
+
+ /**
+ * Get a boolean with given configure key.
+ * Return the default value def if key is not found.
+ * @param key The key of the configure item.
+ * @param def The default value.
+ * @return The value of configure item.
+ */
+ bool getBool(const std::string &key, bool def) const;
+
+ /**
+ * Set a configure item
+ * @param key The key will set.
+ * @param value The value will be set to.
+ */
+ template <typename T>
+ void set(const std::string &key, T const &value) {
+ std::stringstream ss;
+ ss << value;
+ kv[key] = ss.str();
+ }
+
+ /**
+ * Get the hash value of this object
+ *
+ * @return The hash value
+ */
+ size_t hash_value() const;
+
+private:
+ std::string path;
+ std::map<std::string, std::string> kv;
+};
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_COMMON_XMLCONFIGIMPL_H_ */