You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/17 19:45:01 UTC
svn commit: r1611413 [10/18] - in
/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client:
./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nati...
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Combiner.h"
+#include "StringUtil.h"
+
+namespace NativeTask {
+
+NativeCombineRunner::NativeCombineRunner(Config * config, ObjectCreatorFunc combinerCreator)
+ : _config(config), _combinerCreator(combinerCreator), _keyGroupCount(0) {
+ if (NULL == _combinerCreator) {
+ THROW_EXCEPTION_EX(UnsupportException, "Create combiner failed");
+ }
+}
+
+KeyGroupIterator * NativeCombineRunner::createKeyGroupIterator(KVIterator * iter) {
+ return new KeyGroupIteratorImpl(iter);
+}
+
+void NativeCombineRunner::combine(CombineContext context, KVIterator * iterator,
+ IFileWriter * writer) {
+ Configurable * combiner = (Configurable *)(_combinerCreator());
+ if (NULL != combiner) {
+ combiner->configure(_config);
+ }
+
+ NativeObjectType type = combiner->type();
+ switch (type) {
+ case MapperType: {
+ Mapper * mapper = (Mapper*)combiner;
+ mapper->setCollector(writer);
+
+ Buffer key;
+ Buffer value;
+ while (iterator->next(key, value)) {
+ mapper->map(key.data(), key.length(), value.data(), value.length());
+ }
+ mapper->close();
+ delete mapper;
+ }
+ break;
+ case ReducerType: {
+ Reducer * reducer = (Reducer*)combiner;
+ reducer->setCollector(writer);
+ KeyGroupIterator * kg = createKeyGroupIterator(iterator);
+ while (kg->nextKey()) {
+ _keyGroupCount++;
+ reducer->reduce(*kg);
+ }
+ reducer->close();
+ delete reducer;
+ }
+ break;
+ default:
+ THROW_EXCEPTION(UnsupportException, "Combiner type not support");
+ }
+}
+
+} /* namespace NativeTask */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef COMBINER_H_
+#define COMBINER_H_
+#include "commons.h"
+#include "IFile.h"
+
+namespace NativeTask {
+
+class MemoryBufferKVIterator : public KVIterator {
+public:
+ virtual const char * getBase() = 0;
+ virtual std::vector<uint32_t> * getKVOffsets() = 0;
+};
+
+enum CombineContextType {
+ UNKNOWN = 0,
+ CONTINUOUS_MEMORY_BUFFER = 1,
+};
+
+class CombineContext {
+
+private:
+ CombineContextType _type;
+
+public:
+ CombineContext(CombineContextType type)
+ : _type(type) {
+ }
+
+public:
+ CombineContextType getType() {
+ return _type;
+ }
+};
+
+class CombineInMemory : public CombineContext {
+ CombineInMemory()
+ : CombineContext(CONTINUOUS_MEMORY_BUFFER) {
+ }
+};
+
+class ICombineRunner {
+public:
+ ICombineRunner() {
+ }
+
+ virtual void combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer) = 0;
+
+ virtual ~ICombineRunner() {
+ }
+};
+
+class NativeCombineRunner : public ICombineRunner {
+private:
+ Config * _config;
+ ObjectCreatorFunc _combinerCreator;
+ uint32_t _keyGroupCount;
+
+public:
+ NativeCombineRunner(Config * config, ObjectCreatorFunc objectCreator);
+
+public:
+ void combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer);
+
+private:
+ KeyGroupIterator * createKeyGroupIterator(KVIterator * iter);
+};
+
+} /* namespace NativeTask */
+#endif /* COMBINER_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "SyncUtils.h"
+#include "Compressions.h"
+#include "codec/GzipCodec.h"
+#include "codec/SnappyCodec.h"
+#include "codec/Lz4Codec.h"
+
+namespace NativeTask {
+
+CompressStream::~CompressStream() {
+}
+
+void CompressStream::writeDirect(const void * buff, uint32_t length) {
+ THROW_EXCEPTION(UnsupportException, "writeDirect not support");
+}
+
+///////////////////////////////////////////////////////////
+
+DecompressStream::~DecompressStream() {
+}
+
+int32_t DecompressStream::readDirect(void * buff, uint32_t length) {
+ THROW_EXCEPTION(UnsupportException, "readDirect not support");
+}
+
+///////////////////////////////////////////////////////////
+
+const Compressions::Codec Compressions::GzipCodec = Compressions::Codec(
+ "org.apache.hadoop.io.compress.GzipCodec", ".gz");
+const Compressions::Codec Compressions::SnappyCodec = Compressions::Codec(
+ "org.apache.hadoop.io.compress.SnappyCodec", ".snappy");
+const Compressions::Codec Compressions::Lz4Codec = Compressions::Codec(
+ "org.apache.hadoop.io.compress.Lz4Codec", ".lz4");
+
+vector<Compressions::Codec> Compressions::SupportedCodecs = vector<Compressions::Codec>();
+
+void Compressions::initCodecs() {
+ static Lock lock;
+ ScopeLock<Lock> autolock(lock);
+ if (SupportedCodecs.size() == 0) {
+ SupportedCodecs.push_back(GzipCodec);
+ SupportedCodecs.push_back(SnappyCodec);
+ SupportedCodecs.push_back(Lz4Codec);
+ }
+}
+
+bool Compressions::support(const string & codec) {
+ initCodecs();
+ for (size_t i = 0; i < SupportedCodecs.size(); i++) {
+ if (codec == SupportedCodecs[i].name) {
+ return true;
+ }
+ }
+ return false;
+}
+
+const string Compressions::getExtension(const string & codec) {
+ initCodecs();
+ for (size_t i = 0; i < SupportedCodecs.size(); i++) {
+ if (codec == SupportedCodecs[i].name) {
+ return SupportedCodecs[i].extension;
+ }
+ }
+ return string();
+}
+
+const string Compressions::getCodec(const string & extension) {
+ initCodecs();
+ for (size_t i = 0; i < SupportedCodecs.size(); i++) {
+ if (extension == SupportedCodecs[i].extension) {
+ return SupportedCodecs[i].name;
+ }
+ }
+ return string();
+}
+
+const string Compressions::getCodecByFile(const string & file) {
+ initCodecs();
+ for (size_t i = 0; i < SupportedCodecs.size(); i++) {
+ const string & extension = SupportedCodecs[i].extension;
+ if ((file.length() > extension.length())
+ && (file.substr(file.length() - extension.length()) == extension)) {
+ return SupportedCodecs[i].name;
+ }
+ }
+ return string();
+}
+
+CompressStream * Compressions::getCompressionStream(const string & codec, OutputStream * stream,
+ uint32_t bufferSizeHint) {
+ if (codec == GzipCodec.name) {
+ return new GzipCompressStream(stream, bufferSizeHint);
+ }
+ if (codec == SnappyCodec.name) {
+ return new SnappyCompressStream(stream, bufferSizeHint);
+ }
+ if (codec == Lz4Codec.name) {
+ return new Lz4CompressStream(stream, bufferSizeHint);
+ }
+ return NULL;
+}
+
+DecompressStream * Compressions::getDecompressionStream(const string & codec, InputStream * stream,
+ uint32_t bufferSizeHint) {
+ if (codec == GzipCodec.name) {
+ return new GzipDecompressStream(stream, bufferSizeHint);
+ }
+ if (codec == SnappyCodec.name) {
+ return new SnappyDecompressStream(stream, bufferSizeHint);
+ }
+ if (codec == Lz4Codec.name) {
+ return new Lz4DecompressStream(stream, bufferSizeHint);
+ }
+ return NULL;
+}
+
+} // namespace NativeTask
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMPRESSIONS_H_
+#define COMPRESSIONS_H_
+
+#include <string>
+#include <vector>
+#include "Streams.h"
+
+namespace NativeTask {
+
+using std::vector;
+using std::string;
+
+class CompressStream : public FilterOutputStream {
+public:
+ CompressStream(OutputStream * stream)
+ : FilterOutputStream(stream) {
+ }
+
+ virtual ~CompressStream();
+
+ virtual void writeDirect(const void * buff, uint32_t length);
+
+ virtual void finish() {
+ flush();
+ }
+
+ virtual void resetState() {
+
+ }
+
+ virtual uint64_t compressedBytesWritten() {
+ return 0;
+ }
+};
+
+class DecompressStream : public FilterInputStream {
+public:
+ DecompressStream(InputStream * stream)
+ : FilterInputStream(stream) {
+ }
+
+ virtual ~DecompressStream();
+
+ virtual int32_t readDirect(void * buff, uint32_t length);
+
+ virtual uint64_t compressedBytesRead() {
+ return 0;
+ }
+};
+
+class Compressions {
+protected:
+ class Codec {
+ public:
+ string name;
+ string extension;
+
+ Codec(const string & name, const string & extension)
+ : name(name), extension(extension) {
+ }
+ };
+
+ static vector<Codec> SupportedCodecs;
+
+ static void initCodecs();
+
+public:
+ static const Codec GzipCodec;
+ static const Codec SnappyCodec;
+ static const Codec Lz4Codec;
+
+public:
+ static bool support(const string & codec);
+
+ static const string getExtension(const string & codec);
+
+ static const string getCodec(const string & extension);
+
+ static const string getCodecByFile(const string & file);
+
+ static CompressStream * getCompressionStream(const string & codec, OutputStream * stream,
+ uint32_t bufferSizeHint);
+
+ static DecompressStream * getDecompressionStream(const string & codec, InputStream * stream,
+ uint32_t bufferSizeHint);
+};
+
+} // namespace NativeTask
+
+#endif /* COMPRESSIONS_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Constants.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Constants.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Constants.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Constants.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONSTANTS_H_
+#define CONSTANTS_H_
+
+const uint32_t SIZE_OF_PARTITION_LENGTH = sizeof(uint32_t);
+const uint32_t SIZE_OF_KEY_LENGTH = sizeof(uint32_t);
+const uint32_t SIZE_OF_VALUE_LENGTH = sizeof(uint32_t);
+const uint32_t SIZE_OF_KV_LENGTH = SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH;
+
+#endif //CONSTANTS_H_
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <errno.h>
+#include <dirent.h>
+#include <sys/stat.h>
+#include <jni.h>
+#include "commons.h"
+#include "util/StringUtil.h"
+#include "jniutils.h"
+#include "NativeTask.h"
+#include "TaskCounters.h"
+#include "NativeObjectFactory.h"
+#include "Path.h"
+#include "FileSystem.h"
+
+namespace NativeTask {
+
+/////////////////////////////////////////////////////////////
+
+FileInputStream::FileInputStream(const string & path) {
+ _handle = fopen(path.c_str(), "rb");
+ if (_handle != NULL) {
+ _fd = fileno(_handle);
+ _path = path;
+ } else {
+ _fd = -1;
+ THROW_EXCEPTION_EX(IOException, "Can't open raw file: [%s]", path.c_str());
+ }
+ _bytesRead = NativeObjectFactory::GetCounter(TaskCounters::FILESYSTEM_COUNTER_GROUP,
+ TaskCounters::FILE_BYTES_READ);
+}
+
+FileInputStream::~FileInputStream() {
+ close();
+}
+
+void FileInputStream::seek(uint64_t position) {
+ ::lseek(_fd, position, SEEK_SET);
+}
+
+uint64_t FileInputStream::tell() {
+ return ::lseek(_fd, 0, SEEK_CUR);
+}
+
+int32_t FileInputStream::read(void * buff, uint32_t length) {
+ int32_t ret = ::read(_fd, buff, length);
+ if (ret > 0) {
+ _bytesRead->increase(ret);
+ }
+ return ret;
+}
+
+void FileInputStream::close() {
+ if (_handle != NULL) {
+ fclose(_handle);
+ _handle = NULL;
+ _fd = -1;
+ }
+}
+
+/////////////////////////////////////////////////////////////
+
+FileOutputStream::FileOutputStream(const string & path, bool overwite) {
+ _handle = fopen(path.c_str(), "wb");
+ if (_handle != NULL) {
+ _fd = fileno(_handle);
+ _path = path;
+ } else {
+ _fd = -1;
+ THROW_EXCEPTION_EX(IOException, "Open raw file failed: [%s]", path.c_str());
+ }
+ _bytesWrite = NativeObjectFactory::GetCounter(TaskCounters::FILESYSTEM_COUNTER_GROUP,
+ TaskCounters::FILE_BYTES_WRITTEN);
+}
+
+FileOutputStream::~FileOutputStream() {
+ close();
+}
+
+uint64_t FileOutputStream::tell() {
+ return ::lseek(_fd, 0, SEEK_CUR);
+}
+
+void FileOutputStream::write(const void * buff, uint32_t length) {
+ if (::write(_fd, buff, length) < length) {
+ THROW_EXCEPTION(IOException, "::write error");
+ }
+ _bytesWrite->increase(length);
+}
+
+void FileOutputStream::flush() {
+}
+
+void FileOutputStream::close() {
+ if (_handle != NULL) {
+ fclose(_handle);
+ _handle = NULL;
+ _fd = -1;
+ }
+}
+
+/////////////////////////////////////////////////////////////
+
+class RawFileSystem : public FileSystem {
+protected:
+ string getRealPath(const string & path) {
+ if (StringUtil::StartsWith(path, "file:")) {
+ return path.substr(5);
+ }
+ return path;
+ }
+public:
+ InputStream * open(const string & path) {
+ return new FileInputStream(getRealPath(path));
+ }
+
+ OutputStream * create(const string & path, bool overwrite) {
+ string np = getRealPath(path);
+ string parent = Path::GetParent(np);
+ if (parent.length() > 0) {
+ if (!exists(parent)) {
+ mkdirs(parent);
+ }
+ }
+ return new FileOutputStream(np, overwrite);
+ }
+
+ uint64_t getLength(const string & path) {
+ struct stat st;
+ if (::stat(getRealPath(path).c_str(), &st) != 0) {
+ char buff[256];
+ strerror_r(errno, buff, 256);
+ THROW_EXCEPTION(IOException,
+ StringUtil::Format("stat path %s failed, %s", path.c_str(), buff));
+ }
+ return st.st_size;
+ }
+
+ bool list(const string & path, vector<FileEntry> & status) {
+ DIR * dp;
+ struct dirent * dirp;
+ if ((dp = opendir(path.c_str())) == NULL) {
+ return false;
+ }
+
+ FileEntry temp;
+ while ((dirp = readdir(dp)) != NULL) {
+ temp.name = dirp->d_name;
+ temp.isDirectory = dirp->d_type & DT_DIR;
+ if (temp.name == "." || temp.name == "..") {
+ continue;
+ }
+ status.push_back(temp);
+ }
+ closedir(dp);
+ return true;
+ }
+
+ void remove(const string & path) {
+ if (!exists(path)) {
+ LOG("[FileSystem] remove file %s not exists, ignore", path.c_str());
+ return;
+ }
+ if (::remove(getRealPath(path).c_str()) != 0) {
+ int err = errno;
+ if (::system(StringUtil::Format("rm -rf %s", path.c_str()).c_str()) == 0) {
+ return;
+ }
+ char buff[256];
+ strerror_r(err, buff, 256);
+ THROW_EXCEPTION(IOException,
+ StringUtil::Format("FileSystem: remove path %s failed, %s", path.c_str(), buff));
+ }
+ }
+
+ bool exists(const string & path) {
+ struct stat st;
+ if (::stat(getRealPath(path).c_str(), &st) != 0) {
+ return false;
+ }
+ return true;
+ }
+
+ int mkdirs(const string & path, mode_t nmode) {
+ string np = getRealPath(path);
+ struct stat sb;
+
+ if (stat(np.c_str(), &sb) == 0) {
+ if (S_ISDIR (sb.st_mode) == 0) {
+ return 1;
+ }
+ return 0;
+ }
+
+ string npathstr = np;
+ char * npath = const_cast<char*>(npathstr.c_str());
+
+ /* Skip leading slashes. */
+ char * p = npath;
+ while (*p == '/')
+ p++;
+
+ while (NULL != (p = strchr(p, '/'))) {
+ *p = '\0';
+ if (stat(npath, &sb) != 0) {
+ if (mkdir(npath, nmode)) {
+ return 1;
+ }
+ } else if (S_ISDIR (sb.st_mode) == 0) {
+ return 1;
+ }
+ *p++ = '/'; /* restore slash */
+ while (*p == '/')
+ p++;
+ }
+
+ /* Create the final directory component. */
+ if (stat(npath, &sb) && mkdir(npath, nmode)) {
+ return 1;
+ }
+ return 0;
+ }
+
+ void mkdirs(const string & path) {
+ int ret = mkdirs(path, 0755);
+ if (ret != 0) {
+ THROW_EXCEPTION_EX(IOException, "mkdirs [%s] failed", path.c_str());
+ }
+ }
+};
+
+///////////////////////////////////////////////////////////
+
+extern RawFileSystem RawFileSystemInstance;
+
+RawFileSystem RawFileSystemInstance = RawFileSystem();
+
+string FileSystem::getDefaultFsUri(Config * config) {
+ const char * nm = config->get(FS_DEFAULT_NAME);
+ if (nm == NULL) {
+ nm = config->get("fs.defaultFS");
+ }
+ if (nm == NULL) {
+ return string("file:///");
+ } else {
+ return string(nm);
+ }
+}
+
+FileSystem & FileSystem::getLocal() {
+ return RawFileSystemInstance;
+}
+
+
+FileSystem & FileSystem::get(Config * config) {
+ string uri = getDefaultFsUri(config);
+ if (uri == "file:///") {
+ return RawFileSystemInstance;
+ }
+}
+
+} // namespace Hadoap
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FILESYSTEM_H_
+#define FILESYSTEM_H_
+
+#include <string>
+#include "NativeTask.h"
+#include "Streams.h"
+
+namespace NativeTask {
+
+class FileSystem;
+
+/**
+ * Local raw filesystem file input stream
+ * with blocking semantics
+ */
+class FileInputStream : public InputStream {
+private:
+ string _path;
+ FILE * _handle;
+ int _fd;
+ Counter * _bytesRead;
+public:
+ FileInputStream(const string & path);
+ virtual ~FileInputStream();
+
+ virtual void seek(uint64_t position);
+
+ virtual uint64_t tell();
+
+ virtual int32_t read(void * buff, uint32_t length);
+
+ virtual void close();
+};
+
+/**
+ * Local raw filesystem file output stream
+ * with blocking semantics
+ */
+class FileOutputStream : public OutputStream {
+private:
+ string _path;
+ FILE * _handle;
+ int _fd;
+ Counter * _bytesWrite;
+public:
+ FileOutputStream(const string & path, bool overwite = true);
+ virtual ~FileOutputStream();
+
+ virtual uint64_t tell();
+
+ virtual void write(const void * buff, uint32_t length);
+
+ virtual void flush();
+
+ virtual void close();
+};
+
+
+class FileEntry {
+public:
+ string name;
+ bool isDirectory;
+};
+
+/**
+ * FileSystem interface
+ */
+class FileSystem {
+protected:
+ FileSystem() {
+ }
+public:
+ virtual ~FileSystem() {
+ }
+
+ virtual InputStream * open(const string & path) {
+ return NULL;
+ }
+
+ virtual OutputStream * create(const string & path, bool overwrite = true) {
+ return NULL;
+ }
+
+ virtual uint64_t getLength(const string & path) {
+ return 0;
+ }
+
+ virtual bool list(const string & path, vector<FileEntry> & status) {
+ return false;
+ }
+
+ virtual void remove(const string & path) {
+ }
+
+ virtual bool exists(const string & path) {
+ return false;
+ }
+
+ virtual void mkdirs(const string & path) {
+ }
+
+ static string getDefaultFsUri(Config * config);
+ static FileSystem & getLocal();
+ static FileSystem & getJava(Config * config);
+ static FileSystem & get(Config * config);
+};
+
+} // namespace NativeTask
+
+#endif /* FILESYSTEM_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/StringUtil.h"
+#include "IFile.h"
+#include "Compressions.h"
+#include "lib/FileSystem.h"
+
+namespace NativeTask {
+
+///////////////////////////////////////////////////////////
+
+IFileReader::IFileReader(InputStream * stream, SingleSpillInfo * spill, bool deleteInputStream)
+ : _deleteSourceStream(deleteInputStream), _stream(stream), _source(NULL),
+ _checksumType(spill->checkSumType), _kType(spill->keyType), _vType(spill->valueType),
+ _codec(spill->codec), _segmentIndex(-1), _spillInfo(spill), _valuePos(NULL), _valueLen(0) {
+ _source = new ChecksumInputStream(_stream, _checksumType);
+ _source->setLimit(0);
+ _reader.init(128 * 1024, _source, _codec);
+}
+
+IFileReader::~IFileReader() {
+
+ delete _source;
+ _source = NULL;
+
+ if (_deleteSourceStream) {
+ delete _stream;
+ _stream = NULL;
+ }
+}
+
+/**
+ * 0 if success
+ * 1 if end
+ */
+bool IFileReader::nextPartition() {
+ if (0 != _source->getLimit()) {
+ THROW_EXCEPTION(IOException, "bad ifile segment length");
+ }
+ if (_segmentIndex >= 0) {
+ // verify checksum
+ uint32_t chsum = 0;
+ if (4 != _stream->readFully(&chsum, 4)) {
+ THROW_EXCEPTION(IOException, "read ifile checksum failed");
+ }
+ uint32_t actual = bswap(chsum);
+ uint32_t expect = _source->getChecksum();
+ if (actual != expect) {
+ THROW_EXCEPTION_EX(IOException, "read ifile checksum not match, actual %x expect %x", actual,
+ expect);
+ }
+ }
+ _segmentIndex++;
+ if (_segmentIndex < (int)(_spillInfo->length)) {
+ int64_t end_pos = (int64_t)_spillInfo->segments[_segmentIndex].realEndOffset;
+ if (_segmentIndex > 0) {
+ end_pos -= (int64_t)_spillInfo->segments[_segmentIndex - 1].realEndOffset;
+ }
+ if (end_pos < 0) {
+ THROW_EXCEPTION(IOException, "bad ifile format");
+ }
+ // exclude checksum
+ _source->setLimit(end_pos - 4);
+ _source->resetChecksum();
+ return true;
+ } else {
+ return false;
+ }
+}
+
+///////////////////////////////////////////////////////////
+
+IFileWriter * IFileWriter::create(const std::string & filepath, const MapOutputSpec & spec,
+ Counter * spilledRecords) {
+ OutputStream * fout = FileSystem::getLocal().create(filepath, true);
+ IFileWriter * writer = new IFileWriter(fout, spec.checksumType, spec.keyType, spec.valueType,
+ spec.codec, spilledRecords, true);
+ return writer;
+}
+
+IFileWriter::IFileWriter(OutputStream * stream, ChecksumType checksumType, KeyValueType ktype,
+ KeyValueType vtype, const string & codec, Counter * counter, bool deleteTargetStream)
+ : _deleteTargetStream(deleteTargetStream), _stream(stream), _dest(NULL),
+ _checksumType(checksumType), _kType(ktype), _vType(vtype), _codec(codec),
+ _recordCounter(counter) {
+ _dest = new ChecksumOutputStream(_stream, _checksumType);
+ _appendBuffer.init(128 * 1024, _dest, _codec);
+}
+
+IFileWriter::~IFileWriter() {
+ delete _dest;
+ _dest = NULL;
+
+ if (_deleteTargetStream) {
+ delete _stream;
+ _stream = NULL;
+ }
+}
+
+void IFileWriter::startPartition() {
+ _spillFileSegments.push_back(IFileSegment());
+ _dest->resetChecksum();
+}
+
+void IFileWriter::endPartition() {
+ char EOFMarker[2] = {-1, -1};
+ _appendBuffer.write(EOFMarker, 2);
+ _appendBuffer.flush();
+
+ CompressStream * compressionStream = _appendBuffer.getCompressionStream();
+ if (NULL != compressionStream) {
+ compressionStream->finish();
+ compressionStream->resetState();
+ }
+
+ uint32_t chsum = _dest->getChecksum();
+ chsum = bswap(chsum);
+ _stream->write(&chsum, sizeof(chsum));
+ _stream->flush();
+ IFileSegment * info = &(_spillFileSegments[_spillFileSegments.size() - 1]);
+ info->uncompressedEndOffset = _appendBuffer.getCounter();
+ info->realEndOffset = _stream->tell();
+}
+
+void IFileWriter::write(const char * key, uint32_t keyLen, const char * value, uint32_t valueLen) {
+ // append KeyLength ValueLength KeyBytesLength
+ uint32_t keyBuffLen = keyLen;
+ uint32_t valBuffLen = valueLen;
+ switch (_kType) {
+ case TextType:
+ keyBuffLen += WritableUtils::GetVLongSize(keyLen);
+ break;
+ case BytesType:
+ keyBuffLen += 4;
+ break;
+ default:
+ break;
+ }
+
+ switch (_vType) {
+ case TextType:
+ valBuffLen += WritableUtils::GetVLongSize(valueLen);
+ break;
+ case BytesType:
+ valBuffLen += 4;
+ break;
+ default:
+ break;
+ }
+
+ _appendBuffer.write_vuint2(keyBuffLen, valBuffLen);
+
+ switch (_kType) {
+ case TextType:
+ _appendBuffer.write_vuint(keyLen);
+ break;
+ case BytesType:
+ _appendBuffer.write_uint32_be(keyLen);
+ break;
+ default:
+ break;
+ }
+
+ if (keyLen > 0) {
+ _appendBuffer.write(key, keyLen);
+ }
+
+ if (NULL != _recordCounter) {
+ _recordCounter->increase();
+ }
+
+ switch (_vType) {
+ case TextType:
+ _appendBuffer.write_vuint(valueLen);
+ break;
+ case BytesType:
+ _appendBuffer.write_uint32_be(valueLen);
+ break;
+ default:
+ break;
+ }
+ if (valueLen > 0) {
+ _appendBuffer.write(value, valueLen);
+ }
+}
+
+IFileSegment * IFileWriter::toArray(std::vector<IFileSegment> *segments) {
+ IFileSegment * segs = new IFileSegment[segments->size()];
+ for (size_t i = 0; i < segments->size(); i++) {
+ segs[i] = segments->at(i);
+ }
+ return segs;
+}
+
+SingleSpillInfo * IFileWriter::getSpillInfo() {
+ const uint32_t size = _spillFileSegments.size();
+ return new SingleSpillInfo(toArray(&_spillFileSegments), size, "", _checksumType, _kType, _vType,
+ _codec);
+}
+
+void IFileWriter::getStatistics(uint64_t & offset, uint64_t & realOffset) {
+ if (_spillFileSegments.size() > 0) {
+ offset = _spillFileSegments[_spillFileSegments.size() - 1].uncompressedEndOffset;
+ realOffset = _spillFileSegments[_spillFileSegments.size() - 1].realEndOffset;
+ } else {
+ offset = 0;
+ realOffset = 0;
+ }
+}
+
+} // namespace NativeTask
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef IFILE_H_
+#define IFILE_H_
+
+#include "Checksum.h"
+#include "Buffers.h"
+#include "WritableUtils.h"
+#include "SpillInfo.h"
+#include "MapOutputSpec.h"
+
+namespace NativeTask {
+
+/**
+ * IFileReader
+ */
+class IFileReader {
+private:
+ InputStream * _stream;
+ ChecksumInputStream * _source;
+ ReadBuffer _reader;
+ ChecksumType _checksumType;
+ KeyValueType _kType;
+ KeyValueType _vType;
+ string _codec;
+ int32_t _segmentIndex;
+ SingleSpillInfo * _spillInfo;
+ const char * _valuePos;
+ uint32_t _valueLen;
+ bool _deleteSourceStream;
+
+public:
+ IFileReader(InputStream * stream, SingleSpillInfo * spill, bool deleteSourceStream = false);
+
+ virtual ~IFileReader();
+
+ /**
+ * @return 0 if have next partition, none 0 if no more partition
+ */
+ bool nextPartition();
+
+ /**
+ * get next key
+ * NULL if no more, then next_partition() need to be called
+ * NOTICE: before value() is called, the return pointer value is
+ * guaranteed to be valid
+ */
+ const char * nextKey(uint32_t & keyLen) {
+ int64_t t1 = _reader.readVLong();
+ int64_t t2 = _reader.readVLong();
+ if (t1 == -1) {
+ return NULL;
+ }
+ const char * kvbuff = _reader.get((uint32_t)(t1 + t2));
+ uint32_t len;
+ switch (_kType) {
+ case TextType:
+ keyLen = WritableUtils::ReadVInt(kvbuff, len);
+ break;
+ case BytesType:
+ keyLen = bswap(*(uint32_t*)kvbuff);
+ len = 4;
+ break;
+ default:
+ keyLen = t1;
+ len = 0;
+ }
+ const char * kbuff = kvbuff + len;
+ const char * vbuff = kvbuff + (uint32_t)t1;
+ switch (_vType) {
+ case TextType:
+ _valueLen = WritableUtils::ReadVInt(vbuff, len);
+ _valuePos = vbuff + len;
+ break;
+ case BytesType:
+ _valueLen = bswap(*(uint32_t*)vbuff);
+ _valuePos = vbuff + 4;
+ break;
+ default:
+ _valueLen = t2;
+ _valuePos = vbuff;
+ }
+ return kbuff;
+ }
+
+ /**
+ * length of current value part of IFile entry
+ */
+ uint32_t valueLen() {
+ return _valueLen;
+ }
+
+ /**
+ * get current value
+ */
+ const char * value(uint32_t & valueLen) {
+ valueLen = _valueLen;
+ return _valuePos;
+ }
+};
+
+/**
+ * IFile Writer
+ */
+class IFileWriter : public Collector {
+protected:
+ OutputStream * _stream;
+ ChecksumOutputStream * _dest;
+ ChecksumType _checksumType;
+ KeyValueType _kType;
+ KeyValueType _vType;
+ string _codec;
+ AppendBuffer _appendBuffer;
+ vector<IFileSegment> _spillFileSegments;
+ Counter * _recordCounter;
+
+ bool _deleteTargetStream;
+
+private:
+ IFileSegment * toArray(std::vector<IFileSegment> *segments);
+
+public:
+ static IFileWriter * create(const std::string & filepath, const MapOutputSpec & spec,
+ Counter * spilledRecords);
+
+ IFileWriter(OutputStream * stream, ChecksumType checksumType, KeyValueType ktype,
+ KeyValueType vtype, const string & codec, Counter * recordCounter,
+ bool deleteTargetStream = false);
+
+ virtual ~IFileWriter();
+
+ void startPartition();
+
+ void endPartition();
+
+ virtual void write(const char * key, uint32_t keyLen, const char * value, uint32_t valueLen);
+
+ SingleSpillInfo * getSpillInfo();
+
+ void getStatistics(uint64_t & offset, uint64_t & realOffset);
+
+ virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen) {
+ write((const char*)key, keyLen, (const char*)value, valueLen);
+ }
+};
+
+} // namespace NativeTask
+
+#endif /* IFILE_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Iterator.h"
+#include "commons.h"
+
+namespace NativeTask {
+
+KeyGroupIteratorImpl::KeyGroupIteratorImpl(KVIterator * iterator)
+ : _keyGroupIterState(NEW_KEY), _iterator(iterator), _first(true) {
+}
+
+bool KeyGroupIteratorImpl::nextKey() {
+ if (_keyGroupIterState == NO_MORE) {
+ return false;
+ }
+
+ uint32_t temp;
+ while (_keyGroupIterState == SAME_KEY || _keyGroupIterState == NEW_KEY_VALUE) {
+ nextValue(temp);
+ }
+ if (_keyGroupIterState == NEW_KEY) {
+ if (_first == true) {
+ _first = false;
+ if (!next()) {
+ _keyGroupIterState = NO_MORE;
+ return false;
+ }
+ }
+ _keyGroupIterState = NEW_KEY_VALUE;
+ _currentGroupKey.assign(_key.data(), _key.length());
+ return true;
+ }
+ return false;
+}
+
+const char * KeyGroupIteratorImpl::getKey(uint32_t & len) {
+ len = (uint32_t)_key.length();
+ return _key.data();
+}
+
+const char * KeyGroupIteratorImpl::nextValue(uint32_t & len) {
+ switch (_keyGroupIterState) {
+ case NEW_KEY: {
+ return NULL;
+ }
+ case SAME_KEY: {
+ if (next()) {
+ if (_key.length() == _currentGroupKey.length()) {
+ if (fmemeq(_key.data(), _currentGroupKey.c_str(), _key.length())) {
+ len = _value.length();
+ return _value.data();
+ }
+ }
+ _keyGroupIterState = NEW_KEY;
+ return NULL;
+ }
+ _keyGroupIterState = NO_MORE;
+ return NULL;
+ }
+ case NEW_KEY_VALUE: {
+ _keyGroupIterState = SAME_KEY;
+ len = _value.length();
+ return _value.data();
+ }
+ case NO_MORE:
+ return NULL;
+ }
+ return NULL;
+}
+
+bool KeyGroupIteratorImpl::next() {
+ bool result = _iterator->next(_key, _value);
+ return result;
+}
+
+} //namespace NativeTask
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ITERATOR_H_
+#define ITERATOR_H_
+
+#include "NativeTask.h"
+
+namespace NativeTask {
+
+class KeyGroupIteratorImpl : public KeyGroupIterator {
+protected:
+ // for KeyGroupIterator
+ KeyGroupIterState _keyGroupIterState;
+ KVIterator * _iterator;
+ string _currentGroupKey;
+ Buffer _key;
+ Buffer _value;
+ bool _first;
+
+public:
+ KeyGroupIteratorImpl(KVIterator * iterator);
+ bool nextKey();
+ const char * getKey(uint32_t & len);
+ const char * nextValue(uint32_t & len);
+
+protected:
+ bool next();
+};
+
+} //namespace NativeTask
+#endif
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Log.h"
+
+namespace NativeTask {
+
+#ifdef PRINT_LOG
+
+FILE * LOG_DEVICE = stderr;
+
+#endif
+
+} //namespace NativeTask
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LOG_H_
+#define LOG_H_
+
+#include <stdio.h>
+#include <time.h>
+
+namespace NativeTask {
+
+#define PRINT_LOG
+
+#ifdef PRINT_LOG
+
+extern FILE * LOG_DEVICE;
+#define LOG(_fmt_, args...) if (LOG_DEVICE) { \
+ time_t log_timer; struct tm log_tm; \
+ time(&log_timer); localtime_r(&log_timer, &log_tm); \
+ fprintf(LOG_DEVICE, "%02d/%02d/%02d %02d:%02d:%02d INFO "_fmt_"\n", \
+ log_tm.tm_year%100, log_tm.tm_mon+1, log_tm.tm_mday, \
+ log_tm.tm_hour, log_tm.tm_min, log_tm.tm_sec, \
+ ##args);}
+
+#else
+
+#define LOG(_fmt_, args...)
+
+#endif
+
+} // namespace NativeTask
+
+#endif /* LOG_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/Timer.h"
+#include "util/StringUtil.h"
+#include "FileSystem.h"
+#include "NativeObjectFactory.h"
+#include "MapOutputCollector.h"
+#include "Merge.h"
+#include "NativeTask.h"
+#include "WritableUtils.h"
+#include "util/DualPivotQuickSort.h"
+#include "Combiner.h"
+#include "TaskCounters.h"
+#include "MinHeap.h"
+
+namespace NativeTask {
+
+ICombineRunner * CombineRunnerWrapper::createCombiner() {
+
+ ICombineRunner * combineRunner = NULL;
+ if (NULL != _config->get(NATIVE_COMBINER)) {
+ const char * combinerClass = _config->get(NATIVE_COMBINER);
+ ObjectCreatorFunc objectCreater = NativeObjectFactory::GetObjectCreator(combinerClass);
+ if (NULL == objectCreater) {
+ THROW_EXCEPTION_EX(UnsupportException, "Combiner not found: %s", combinerClass);
+ } else {
+ LOG("[MapOutputCollector::configure] native combiner is enabled: %s", combinerClass);
+ }
+ combineRunner = new NativeCombineRunner(_config, objectCreater);
+ } else {
+ CombineHandler * javaCombiner = _spillOutput->getJavaCombineHandler();
+ if (NULL != javaCombiner) {
+ _isJavaCombiner = true;
+ combineRunner = (ICombineRunner *)javaCombiner;
+ } else {
+ LOG("[MapOutputCollector::getCombiner] cannot get combine handler from java");
+ }
+ }
+ return combineRunner;
+}
+
+void CombineRunnerWrapper::combine(CombineContext type, KVIterator * kvIterator,
+ IFileWriter * writer) {
+
+ if (!_combinerInited) {
+ _combineRunner = createCombiner();
+ _combinerInited = true;
+ }
+
+ if (NULL != _combineRunner) {
+ _combineRunner->combine(type, kvIterator, writer);
+ } else {
+ LOG("[CombineRunnerWrapper::combine] no valid combiner");
+ }
+}
+
+/////////////////////////////////////////////////////////////////
+// MapOutputCollector
+/////////////////////////////////////////////////////////////////
+
+MapOutputCollector::MapOutputCollector(uint32_t numberPartitions, SpillOutputService * spillService)
+ : _config(NULL), _buckets(NULL), _keyComparator(NULL), _defaultBlockSize(0),
+ _combineRunner(NULL), _spilledRecords(NULL), _spillOutput(spillService), _pool(NULL),
+ _numPartitions(numberPartitions) {
+ _pool = new MemoryPool();
+}
+
+MapOutputCollector::~MapOutputCollector() {
+
+ if (NULL != _buckets) {
+ for (uint32_t i = 0; i < _numPartitions; i++) {
+ if (NULL != _buckets[i]) {
+ delete _buckets[i];
+ _buckets[i] = NULL;
+ }
+ }
+ }
+
+ delete[] _buckets;
+ _buckets = NULL;
+
+ if (NULL != _pool) {
+ delete _pool;
+ _pool = NULL;
+ }
+
+ if (NULL != _combineRunner) {
+ delete _combineRunner;
+ _combineRunner = NULL;
+ }
+}
+
+void MapOutputCollector::init(uint32_t defaultBlockSize, uint32_t memoryCapacity,
+ ComparatorPtr keyComparator, Counter * spilledRecords, ICombineRunner * combiner) {
+
+ this->_combineRunner = combiner;
+
+ this->_defaultBlockSize = defaultBlockSize;
+
+ _pool->init(memoryCapacity);
+
+ //TODO: add support for customized comparator
+ this->_keyComparator = keyComparator;
+
+ _buckets = new PartitionBucket*[_numPartitions];
+
+ for (uint32_t partitionId = 0; partitionId < _numPartitions; partitionId++) {
+ PartitionBucket * pb = new PartitionBucket(_pool, partitionId, keyComparator, _combineRunner,
+ defaultBlockSize);
+
+ _buckets[partitionId] = pb;
+ }
+
+ _spilledRecords = spilledRecords;
+
+ _collectTimer.reset();
+}
+
+void MapOutputCollector::reset() {
+ for (uint32_t i = 0; i < _numPartitions; i++) {
+ if (NULL != _buckets[i]) {
+ _buckets[i]->reset();
+ }
+ }
+ _pool->reset();
+}
+
+void MapOutputCollector::configure(Config * config) {
+ _config = config;
+ MapOutputSpec::getSpecFromConfig(config, _spec);
+
+ uint32_t maxBlockSize = config->getInt(NATIVE_SORT_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE);
+ uint32_t capacity = config->getInt(MAPRED_IO_SORT_MB, 300) * 1024 * 1024;
+
+ uint32_t defaultBlockSize = getDefaultBlockSize(capacity, _numPartitions, maxBlockSize);
+ LOG("Native Total MemoryBlockPool: num_partitions %u, min_block_size %uK, max_block_size %uK, capacity %uM", _numPartitions, defaultBlockSize / 1024,
+ maxBlockSize / 1024, capacity / 1024 / 1024);
+
+ ComparatorPtr comparator = getComparator(config, _spec);
+
+ Counter * spilledRecord = NativeObjectFactory::GetCounter(TaskCounters::TASK_COUNTER_GROUP,
+ TaskCounters::SPILLED_RECORDS);
+
+ ICombineRunner * combiner = NULL;
+ if (NULL != config->get(NATIVE_COMBINER)
+ // config name for old api and new api
+ || NULL != config->get(MAPRED_COMBINE_CLASS_OLD)
+ || NULL != config->get(MAPRED_COMBINE_CLASS_NEW)) {
+ combiner = new CombineRunnerWrapper(config, _spillOutput);
+ }
+
+ init(defaultBlockSize, capacity, comparator, spilledRecord, combiner);
+}
+
+KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t partitionId, uint32_t kvlength) {
+ PartitionBucket * partition = getPartition(partitionId);
+ if (NULL == partition) {
+ THROW_EXCEPTION_EX(IOException, "Partition is NULL, partition_id: %d, num_partitions: %d", partitionId, _numPartitions);
+ }
+
+ KVBuffer * dest = partition->allocateKVBuffer(kvlength);
+
+ if (NULL == dest) {
+ string * spillpath = _spillOutput->getSpillPath();
+ if (NULL == spillpath || spillpath->length() == 0) {
+ THROW_EXCEPTION(IOException, "Illegal(empty) spill files path");
+ } else {
+ middleSpill(*spillpath, "");
+ delete spillpath;
+ }
+
+ dest = dest = partition->allocateKVBuffer(kvlength);
+ if (NULL == dest) {
+ // io.sort.mb too small, cann't proceed
+ // should not get here, cause get_buffer_to_put can throw OOM exception
+ THROW_EXCEPTION(OutOfMemoryException, "key/value pair larger than io.sort.mb");
+ }
+ }
+ return dest;
+}
+
+/**
+ * collect one k/v pair
+ * @return true success; false buffer full, need spill
+ */
+bool MapOutputCollector::collect(const void * key, uint32_t keylen, const void * value,
+ uint32_t vallen, uint32_t partitionId) {
+ uint32_t total_length = keylen + vallen + KVBuffer::headerLength();
+ KVBuffer * buff = allocateKVBuffer(partitionId, total_length);
+
+ if (NULL == buff) {
+ return false;
+ }
+ buff->fill(key, keylen, value, vallen);
+ return true;
+}
+
+ComparatorPtr MapOutputCollector::getComparator(Config * config, MapOutputSpec & spec) {
+ string nativeComparator = NATIVE_MAPOUT_KEY_COMPARATOR;
+ const char * key_class = config->get(MAPRED_MAPOUTPUT_KEY_CLASS);
+ if (NULL == key_class) {
+ key_class = config->get(MAPRED_OUTPUT_KEY_CLASS);
+ }
+ nativeComparator.append(".").append(key_class);
+ const char * comparatorName = config->get(nativeComparator);
+ return NativeTask::get_comparator(spec.keyType, comparatorName);
+}
+
+PartitionBucket * MapOutputCollector::getPartition(uint32_t partition) {
+ if (partition >= _numPartitions) {
+ return NULL;
+ }
+ return _buckets[partition];
+}
+
+/**
+ * Spill buffer to file
+ * @return Array of spill segments information
+ */
+void MapOutputCollector::sortPartitions(SortOrder orderType, SortAlgorithm sortType,
+ IFileWriter * writer, SortMetrics & metric) {
+
+ uint32_t start_partition = 0;
+ uint32_t num_partition = _numPartitions;
+ if (orderType == GROUPBY) {
+ THROW_EXCEPTION(UnsupportException, "GROUPBY not supported");
+ }
+
+ uint64_t sortingTime = 0;
+ Timer timer;
+ uint64_t recordNum = 0;
+
+ for (uint32_t i = 0; i < num_partition; i++) {
+ if (NULL != writer) {
+ writer->startPartition();
+ }
+ PartitionBucket * pb = _buckets[start_partition + i];
+ if (pb != NULL) {
+ recordNum += pb->getKVCount();
+ if (orderType == FULLORDER) {
+ timer.reset();
+ pb->sort(sortType);
+ sortingTime += timer.now() - timer.last();
+ }
+ if (NULL != writer) {
+ pb->spill(writer);
+ }
+ }
+ if (NULL != writer) {
+ writer->endPartition();
+ }
+ }
+ metric.sortTime = sortingTime;
+ metric.recordCount = recordNum;
+}
+
+void MapOutputCollector::middleSpill(const std::string & spillOutput,
+ const std::string & indexFilePath) {
+
+ uint64_t collecttime = _collectTimer.now() - _collectTimer.last();
+ const uint64_t M = 1000000; //million
+
+ if (spillOutput.empty()) {
+ THROW_EXCEPTION(IOException, "MapOutputCollector: Spill file path empty");
+ } else {
+ OutputStream * fout = FileSystem::getLocal().create(spillOutput, true);
+
+ IFileWriter * writer = new IFileWriter(fout, _spec.checksumType, _spec.keyType, _spec.valueType,
+ _spec.codec, _spilledRecords);
+
+ Timer timer;
+ SortMetrics metrics;
+ sortPartitions(_spec.sortOrder, _spec.sortAlgorithm, writer, metrics);
+
+ SingleSpillInfo * info = writer->getSpillInfo();
+ info->path = spillOutput;
+ uint64_t spillTime = timer.now() - timer.last() - metrics.sortTime;
+
+ LOG(
+ "[MapOutputCollector::mid_spill] Sort and spill: {spilled file path: %s, id: %d, collect: %llu ms, sort: %llu ms, spill: %llu ms, records: %llu, uncompressed total bytes: %llu, compressed total bytes: %llu}",
+ info->path.c_str(), _spillInfos.getSpillCount(), collecttime / M, metrics.sortTime / M, spillTime / M,
+ metrics.recordCount, info->getEndPosition(), info->getRealEndPosition());
+
+ if (indexFilePath.length() > 0) {
+ info->writeSpillInfo(indexFilePath);
+ delete info;
+ } else {
+ _spillInfos.add(info);
+ }
+
+ delete writer;
+ delete fout;
+
+ reset();
+ _collectTimer.reset();
+ }
+}
+
+/**
+ * final merge and/or spill, use previous spilled
+ * file & in-memory data
+ */
+void MapOutputCollector::finalSpill(const std::string & filepath,
+ const std::string & idx_file_path) {
+
+ const uint64_t M = 1000000; //million
+ LOG("[MapOutputCollector::final_merge_and_spill] Spilling file path: %s", filepath.c_str());
+
+ if (_spillInfos.getSpillCount() == 0) {
+ middleSpill(filepath, idx_file_path);
+ return;
+ }
+
+ IFileWriter * writer = IFileWriter::create(filepath, _spec, _spilledRecords);
+ Merger * merger = new Merger(writer, _config, _keyComparator, _combineRunner);
+
+ for (size_t i = 0; i < _spillInfos.getSpillCount(); i++) {
+ SingleSpillInfo * spill = _spillInfos.getSingleSpillInfo(i);
+ MergeEntryPtr pme = IFileMergeEntry::create(spill);
+ merger->addMergeEntry(pme);
+ }
+
+ SortMetrics metrics;
+ sortPartitions(_spec.sortOrder, _spec.sortAlgorithm, NULL, metrics);
+ LOG("[MapOutputCollector::mid_spill] Sort final in memory kvs: {sort: %llu ms, records: %llu}",
+ metrics.sortTime / M, metrics.recordCount);
+
+ merger->addMergeEntry(new MemoryMergeEntry(_buckets, _numPartitions));
+
+ Timer timer;
+ merger->merge();
+ LOG(
+ "[MapOutputCollector::final_merge_and_spill] Merge and Spill:{spilled file id: %d, merge and spill time: %llu ms}",
+ _spillInfos.getSpillCount(), (timer.now() - timer.last()) / M);
+
+ delete merger;
+
+ // write index
+ SingleSpillInfo * spill_range = writer->getSpillInfo();
+ spill_range->writeSpillInfo(idx_file_path);
+ delete spill_range;
+ _spillInfos.deleteAllSpillFiles();
+ delete writer;
+ reset();
+}
+
+void MapOutputCollector::close() {
+ string * outputpath = _spillOutput->getOutputPath();
+ string * indexpath = _spillOutput->getOutputIndexPath();
+
+ if ((outputpath->length() == 0) || (indexpath->length() == 0)) {
+ THROW_EXCEPTION(IOException, "Illegal(empty) map output file/index path");
+ }
+
+ finalSpill(*outputpath, *indexpath);
+
+ delete outputpath;
+ delete indexpath;
+}
+} // namespace NativeTask
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MAP_OUTPUT_COLLECTOR_H_
+#define MAP_OUTPUT_COLLECTOR_H_
+
+#include "NativeTask.h"
+#include "MemoryPool.h"
+#include "Timer.h"
+#include "Buffers.h"
+#include "MapOutputSpec.h"
+#include "IFile.h"
+#include "SpillInfo.h"
+#include "Combiner.h"
+#include "PartitionBucket.h"
+#include "lib/SpillOutputService.h"
+
+namespace NativeTask {
+/**
+ * MapOutputCollector
+ */
+
+struct SortMetrics {
+ uint64_t recordCount;
+ uint64_t sortTime;
+
+public:
+ SortMetrics()
+ : recordCount(0), sortTime(0) {
+ }
+};
+
+class CombineRunnerWrapper : public ICombineRunner {
+private:
+ Config * _config;
+ ICombineRunner * _combineRunner;
+ bool _isJavaCombiner;
+ bool _combinerInited;
+ SpillOutputService * _spillOutput;
+
+public:
+ CombineRunnerWrapper(Config * config, SpillOutputService * service)
+ : _spillOutput(service), _config(config), _isJavaCombiner(false), _combineRunner(NULL),
+ _combinerInited(false) {
+ }
+
+ ~CombineRunnerWrapper() {
+ if (!_isJavaCombiner) {
+ delete _combineRunner;
+ }
+ }
+
+ virtual void combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer);
+
+private:
+ ICombineRunner * createCombiner();
+};
+
+class MapOutputCollector {
+ static const uint32_t DEFAULT_MIN_BLOCK_SIZE = 16 * 1024;
+ static const uint32_t DEFAULT_MAX_BLOCK_SIZE = 4 * 1024 * 1024;
+
+private:
+ Config * _config;
+
+ uint32_t _numPartitions;
+ PartitionBucket ** _buckets;
+
+ ComparatorPtr _keyComparator;
+
+ ICombineRunner * _combineRunner;
+
+ Counter * _spilledRecords;
+ SpillOutputService * _spillOutput;
+
+ uint32_t _defaultBlockSize;
+
+ SpillInfos _spillInfos;
+
+ MapOutputSpec _spec;
+
+ Timer _collectTimer;
+
+ MemoryPool * _pool;
+
+public:
+ MapOutputCollector(uint32_t num_partition, SpillOutputService * spillService);
+
+ ~MapOutputCollector();
+
+ void configure(Config * config);
+
+ /**
+ * collect one k/v pair
+ * @return true success; false buffer full, need spill
+ */
+ bool collect(const void * key, uint32_t keylen, const void * value, uint32_t vallen,
+ uint32_t partitionId);
+
+ KVBuffer * allocateKVBuffer(uint32_t partitionId, uint32_t kvlength);
+
+ void close();
+
+private:
+ void init(uint32_t maxBlockSize, uint32_t memory_capacity, ComparatorPtr keyComparator,
+ Counter * spilledRecord, ICombineRunner * combiner);
+
+ void reset();
+
+ /**
+ * spill a range of partition buckets, prepare for future
+ * Parallel sort & spill, TODO: parallel sort & spill
+ */
+ void sortPartitions(SortOrder orderType, SortAlgorithm sortType, IFileWriter * writer,
+ SortMetrics & metrics);
+
+ ComparatorPtr getComparator(Config * config, MapOutputSpec & spec);
+
+ inline uint32_t GetCeil(uint32_t v, uint32_t unit) {
+ return ((v + unit - 1) / unit) * unit;
+ }
+
+ uint32_t getDefaultBlockSize(uint32_t memoryCapacity, uint32_t partitionNum,
+ uint32_t maxBlockSize) {
+ uint32_t defaultBlockSize = memoryCapacity / _numPartitions / 4;
+ defaultBlockSize = GetCeil(defaultBlockSize, DEFAULT_MIN_BLOCK_SIZE);
+ defaultBlockSize = std::min(defaultBlockSize, maxBlockSize);
+ return defaultBlockSize;
+ }
+
+ PartitionBucket * getPartition(uint32_t partition);
+
+ /**
+ * normal spill use options in _config
+ * @param filepaths: spill file path
+ */
+ void middleSpill(const std::string & spillOutput, const std::string & indexFilePath);
+
+ /**
+ * final merge and/or spill use options in _config, and
+ * previous spilled file & in-memory data
+ */
+ void finalSpill(const std::string & filepath, const std::string & indexpath);
+};
+
+} //namespace NativeTask
+
+#endif /* MAP_OUTPUT_COLLECTOR_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "MapOutputSpec.h"
+#include "NativeTask.h"
+
+namespace NativeTask {
+
+void MapOutputSpec::getSpecFromConfig(Config * config, MapOutputSpec & spec) {
+ if (NULL == config) {
+ return;
+ }
+ spec.checksumType = CHECKSUM_CRC32;
+ string sortType = config->get(NATIVE_SORT_TYPE, "DUALPIVOTSORT");
+ if (sortType == "DUALPIVOTSORT") {
+ spec.sortAlgorithm = DUALPIVOTSORT;
+ } else {
+ spec.sortAlgorithm = CPPSORT;
+ }
+ if (config->get(MAPRED_COMPRESS_MAP_OUTPUT, "false") == "true") {
+ spec.codec = config->get(MAPRED_MAP_OUTPUT_COMPRESSION_CODEC);
+ } else {
+ spec.codec = "";
+ }
+ if (config->getBool(MAPRED_SORT_AVOID, false)) {
+ spec.sortOrder = NOSORT;
+ } else {
+ spec.sortOrder = FULLORDER;
+ }
+ const char * key_class = config->get(MAPRED_MAPOUTPUT_KEY_CLASS);
+ if (NULL == key_class) {
+ key_class = config->get(MAPRED_OUTPUT_KEY_CLASS);
+ }
+ if (NULL == key_class) {
+ THROW_EXCEPTION(IOException, "mapred.mapoutput.key.class not set");
+ }
+ spec.keyType = JavaClassToKeyValueType(key_class);
+ const char * value_class = config->get(MAPRED_MAPOUTPUT_VALUE_CLASS);
+ if (NULL == value_class) {
+ value_class = config->get(MAPRED_OUTPUT_VALUE_CLASS);
+ }
+ if (NULL == value_class) {
+ THROW_EXCEPTION(IOException, "mapred.mapoutput.value.class not set");
+ }
+ spec.valueType = JavaClassToKeyValueType(value_class);
+}
+
+} // namespace NativeTask
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MAPOUTPUTSPEC_H_
+#define MAPOUTPUTSPEC_H_
+
+#include <string>
+#include "Checksum.h"
+#include "WritableUtils.h"
+#include "NativeTask.h"
+
+namespace NativeTask {
+
+using std::string;
+
+/**
+ * internal sort method
+ */
+enum SortAlgorithm {
+ CQSORT = 0,
+ CPPSORT = 1,
+ DUALPIVOTSORT = 2,
+};
+
+/**
+ * spill file type
+ * INTERMEDIATE: a simple key/value sequence file
+ * IFILE: classic hadoop IFile
+ */
+enum OutputFileType {
+ INTERMEDIATE = 0,
+ IFILE = 1,
+};
+
+/**
+ * key/value recored order requirements
+ * FULLSORT: hadoop standard
+ * GROUPBY: same key are grouped together, but not in order
+ * NOSORT: no order at all
+ */
+enum SortOrder {
+ FULLORDER = 0,
+ GROUPBY = 1,
+ NOSORT = 2,
+};
+
+enum CompressionType {
+ PLAIN = 0,
+ SNAPPY = 1,
+};
+
+class MapOutputSpec {
+public:
+ KeyValueType keyType;
+ KeyValueType valueType;
+ SortOrder sortOrder;
+ SortAlgorithm sortAlgorithm;
+ string codec;
+ ChecksumType checksumType;
+
+ static void getSpecFromConfig(Config * config, MapOutputSpec & spec);
+};
+
+} // namespace NativeTask
+
+#endif /* MAPOUTPUTSPEC_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "NativeTask.h"
+#include "commons.h"
+#include "Timer.h"
+#include "Buffers.h"
+#include "MapOutputSpec.h"
+#include "IFile.h"
+#include "SpillInfo.h"
+#include "Combiner.h"
+#include "MapOutputSpec.h"
+
+#include "MemoryBlock.h"
+#include "MemoryPool.h"
+#include "util/DualPivotQuickSort.h"
+
+namespace NativeTask {
+
+class MemoryPool;
+
+MemoryBlock::MemoryBlock(char * pos, uint32_t size)
+ : _base(pos), _size(size), _position(0), _sorted(false) {
+}
+
+KVBuffer * MemoryBlock::getKVBuffer(int index) {
+ if (index < 0 || index >= _kvOffsets.size()) {
+ return NULL;
+ }
+ uint32_t offset = _kvOffsets.at(index);
+ KVBuffer * kvbuffer = (KVBuffer*)(_base + offset);
+ return kvbuffer;
+}
+
+void MemoryBlock::sort(SortAlgorithm type, ComparatorPtr comparator) {
+ if ((!_sorted) && (_kvOffsets.size() > 1)) {
+ switch (type) {
+ case CPPSORT:
+ std::sort(_kvOffsets.begin(), _kvOffsets.end(), ComparatorForStdSort(_base, comparator));
+ break;
+ case DUALPIVOTSORT: {
+ DualPivotQuicksort(_kvOffsets, ComparatorForDualPivotSort(_base, comparator));
+ }
+ break;
+ default:
+ THROW_EXCEPTION(UnsupportException, "Sort Algorithm not support");
+ }
+ }
+ _sorted = true;
+}
+} //namespace NativeTask
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "commons.h"
+
+#ifndef MEMORYBLOCK_H_
+#define MEMORYBLOCK_H_
+
+namespace NativeTask {
+
+class MemoryPool;
+
+class ComparatorForDualPivotSort {
+private:
+ const char * _base;
+ ComparatorPtr _keyComparator;
+public:
+ ComparatorForDualPivotSort(const char * base, ComparatorPtr comparator)
+ : _base(base), _keyComparator(comparator) {
+ }
+
+ inline int operator()(uint32_t lhs, uint32_t rhs) {
+ KVBuffer * left = (KVBuffer *)(_base + lhs);
+ KVBuffer * right = (KVBuffer *)(_base + rhs);
+ return (*_keyComparator)(left->content, left->keyLength, right->content, right->keyLength);
+ }
+};
+
+class ComparatorForStdSort {
+private:
+ const char * _base;
+ ComparatorPtr _keyComparator;
+public:
+ ComparatorForStdSort(const char * base, ComparatorPtr comparator)
+ : _base(base), _keyComparator(comparator) {
+ }
+
+public:
+ inline bool operator()(uint32_t lhs, uint32_t rhs) {
+ KVBuffer * left = (KVBuffer *)(_base + lhs);
+ KVBuffer * right = (KVBuffer *)(_base + rhs);
+ int ret = (*_keyComparator)(left->getKey(), left->keyLength, right->getKey(), right->keyLength);
+ return ret < 0;
+ }
+};
+
+class MemoryBlock {
+private:
+ char * _base;
+ uint32_t _size;
+ uint32_t _position;
+ std::vector<uint32_t> _kvOffsets;
+ bool _sorted;
+
+public:
+ MemoryBlock(char * pos, uint32_t size);
+
+ bool sorted() {
+ return _sorted;
+ }
+
+ KVBuffer * allocateKVBuffer(uint32_t length) {
+ if (length > remainSpace()) {
+ LOG("Unable to allocate kv from memory buffer, length: %d, remain: %d", length, remainSpace());
+ return NULL;
+ }
+ _sorted = false;
+ _kvOffsets.push_back(_position);
+ char * space = _base + _position;
+ _position += length;
+ return (KVBuffer *)space;
+ }
+
+ uint32_t remainSpace() const {
+ return _size - _position;
+ }
+
+ uint32_t getKVCount() {
+ return _kvOffsets.size();
+ }
+
+ KVBuffer * getKVBuffer(int index);
+
+ void sort(SortAlgorithm type, ComparatorPtr comparator);
+};
+//class MemoryBlock
+
+class MemBlockIterator {
+private:
+ MemoryBlock * _memBlock;
+ uint32_t _end;
+ uint32_t _current;
+ KVBuffer * _kvBuffer;
+
+public:
+
+ MemBlockIterator(MemoryBlock * memBlock)
+ : _memBlock(memBlock), _end(0), _current(0), _kvBuffer(NULL) {
+ _end = memBlock->getKVCount();
+ }
+
+ KVBuffer * getKVBuffer() {
+ return _kvBuffer;
+ }
+
+ /**
+ * move to next key/value
+ * 0 on success
+ * 1 on no more
+ */
+ bool next() {
+ if (_current >= _end) {
+ return false;
+ }
+ this->_kvBuffer = _memBlock->getKVBuffer(_current);
+ ++_current;
+ return true;
+ }
+};
+//class MemoryBlockIterator
+
+typedef MemBlockIterator * MemBlockIteratorPtr;
+
+class MemBlockComparator {
+private:
+ ComparatorPtr _keyComparator;
+
+public:
+ MemBlockComparator(ComparatorPtr comparator)
+ : _keyComparator(comparator) {
+ }
+
+public:
+ bool operator()(const MemBlockIteratorPtr lhs, const MemBlockIteratorPtr rhs) {
+
+ KVBuffer * left = lhs->getKVBuffer();
+ KVBuffer * right = rhs->getKVBuffer();
+
+ //Treat NULL as infinite MAX, so that we can pop out next value
+ if (NULL == left) {
+ return false;
+ }
+
+ if (NULL == right) {
+ return true;
+ }
+
+ return (*_keyComparator)(left->content, left->keyLength, right->content, right->keyLength) < 0;
+ }
+};
+
+} //namespace NativeTask
+
+#endif /* MEMORYBLOCK_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryPool.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryPool.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryPool.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryPool.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MEMORYPOOL_H_
+#define MEMORYPOOL_H_
+
+#include "Buffers.h"
+#include "MapOutputSpec.h"
+#include "NativeTask.h"
+#include "util/StringUtil.h"
+
+namespace NativeTask {
+
+/**
+ * Class for allocating memory buffer
+ */
+
+class MemoryPool {
+private:
+ char * _base;
+ uint32_t _capacity;
+ uint32_t _used;
+
+public:
+
+ MemoryPool()
+ : _base(NULL), _capacity(0), _used(0) {
+ }
+
+ ~MemoryPool() {
+ if (NULL != _base) {
+ free(_base);
+ _base = NULL;
+ }
+ }
+
+ void init(uint32_t capacity) throw (OutOfMemoryException) {
+ if (capacity > _capacity) {
+ if (NULL != _base) {
+ free(_base);
+ _base = NULL;
+ }
+ _base = (char*)malloc(capacity);
+ if (NULL == _base) {
+ THROW_EXCEPTION(OutOfMemoryException, "Not enough memory to init MemoryBlockPool");
+ }
+ _capacity = capacity;
+ }
+ reset();
+ }
+
+ void reset() {
+ _used = 0;
+ }
+
+ char * allocate(uint32_t min, uint32_t expect, uint32_t & allocated) {
+ if (_used + min > _capacity) {
+ return NULL;
+ } else if (_used + expect > _capacity) {
+ char * buff = _base + _used;
+ allocated = min;
+ _used += min;
+ return buff;
+ } else {
+ char * buff = _base + _used;
+ allocated = expect;
+ _used += expect;
+ return buff;
+ }
+ }
+};
+
+} // namespace NativeTask
+
+#endif /* MEMORYPOOL_H_ */