You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/17 19:45:01 UTC

svn commit: r1611413 [10/18] - in /hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nati...

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Combiner.h"
+#include "StringUtil.h"
+
+namespace NativeTask {
+
+NativeCombineRunner::NativeCombineRunner(Config * config, ObjectCreatorFunc combinerCreator)
+    : _config(config), _combinerCreator(combinerCreator), _keyGroupCount(0) {
+  if (NULL == _combinerCreator) {
+    THROW_EXCEPTION_EX(UnsupportException, "Create combiner failed");
+  }
+}
+
+KeyGroupIterator * NativeCombineRunner::createKeyGroupIterator(KVIterator * iter) {
+  return new KeyGroupIteratorImpl(iter);
+}
+
+void NativeCombineRunner::combine(CombineContext context, KVIterator * iterator,
+    IFileWriter * writer) {
+  Configurable * combiner = (Configurable *)(_combinerCreator());
+  if (NULL != combiner) {
+    combiner->configure(_config);
+  }
+
+  NativeObjectType type = combiner->type();
+  switch (type) {
+  case MapperType: {
+    Mapper * mapper = (Mapper*)combiner;
+    mapper->setCollector(writer);
+
+    Buffer key;
+    Buffer value;
+    while (iterator->next(key, value)) {
+      mapper->map(key.data(), key.length(), value.data(), value.length());
+    }
+    mapper->close();
+    delete mapper;
+  }
+    break;
+  case ReducerType: {
+    Reducer * reducer = (Reducer*)combiner;
+    reducer->setCollector(writer);
+    KeyGroupIterator * kg = createKeyGroupIterator(iterator);
+    while (kg->nextKey()) {
+      _keyGroupCount++;
+      reducer->reduce(*kg);
+    }
+    reducer->close();
+    delete reducer;
+  }
+    break;
+  default:
+    THROW_EXCEPTION(UnsupportException, "Combiner type not support");
+  }
+}
+
+} /* namespace NativeTask */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Combiner.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef COMBINER_H_
+#define COMBINER_H_
+#include "commons.h"
+#include "IFile.h"
+
+namespace NativeTask {
+
+class MemoryBufferKVIterator : public KVIterator {
+public:
+  virtual const char * getBase() = 0;
+  virtual std::vector<uint32_t> * getKVOffsets() = 0;
+};
+
+enum CombineContextType {
+  UNKNOWN = 0,
+  CONTINUOUS_MEMORY_BUFFER = 1,
+};
+
+class CombineContext {
+
+private:
+  CombineContextType _type;
+
+public:
+  CombineContext(CombineContextType type)
+      : _type(type) {
+  }
+
+public:
+  CombineContextType getType() {
+    return _type;
+  }
+};
+
+class CombineInMemory : public CombineContext {
+  CombineInMemory()
+      : CombineContext(CONTINUOUS_MEMORY_BUFFER) {
+  }
+};
+
+class ICombineRunner {
+public:
+  ICombineRunner() {
+  }
+
+  virtual void combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer) = 0;
+
+  virtual ~ICombineRunner() {
+  }
+};
+
+class NativeCombineRunner : public ICombineRunner {
+private:
+  Config * _config;
+  ObjectCreatorFunc _combinerCreator;
+  uint32_t _keyGroupCount;
+
+public:
+  NativeCombineRunner(Config * config, ObjectCreatorFunc objectCreator);
+
+public:
+  void combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer);
+
+private:
+  KeyGroupIterator * createKeyGroupIterator(KVIterator * iter);
+};
+
+} /* namespace NativeTask */
+#endif /* COMBINER_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "SyncUtils.h"
+#include "Compressions.h"
+#include "codec/GzipCodec.h"
+#include "codec/SnappyCodec.h"
+#include "codec/Lz4Codec.h"
+
+namespace NativeTask {
+
+CompressStream::~CompressStream() {
+}
+
+void CompressStream::writeDirect(const void * buff, uint32_t length) {
+  THROW_EXCEPTION(UnsupportException, "writeDirect not support");
+}
+
+///////////////////////////////////////////////////////////
+
+DecompressStream::~DecompressStream() {
+}
+
+int32_t DecompressStream::readDirect(void * buff, uint32_t length) {
+  THROW_EXCEPTION(UnsupportException, "readDirect not support");
+}
+
+///////////////////////////////////////////////////////////
+
+const Compressions::Codec Compressions::GzipCodec = Compressions::Codec(
+    "org.apache.hadoop.io.compress.GzipCodec", ".gz");
+const Compressions::Codec Compressions::SnappyCodec = Compressions::Codec(
+    "org.apache.hadoop.io.compress.SnappyCodec", ".snappy");
+const Compressions::Codec Compressions::Lz4Codec = Compressions::Codec(
+    "org.apache.hadoop.io.compress.Lz4Codec", ".lz4");
+
+vector<Compressions::Codec> Compressions::SupportedCodecs = vector<Compressions::Codec>();
+
+void Compressions::initCodecs() {
+  static Lock lock;
+  ScopeLock<Lock> autolock(lock);
+  if (SupportedCodecs.size() == 0) {
+    SupportedCodecs.push_back(GzipCodec);
+    SupportedCodecs.push_back(SnappyCodec);
+    SupportedCodecs.push_back(Lz4Codec);
+  }
+}
+
+bool Compressions::support(const string & codec) {
+  initCodecs();
+  for (size_t i = 0; i < SupportedCodecs.size(); i++) {
+    if (codec == SupportedCodecs[i].name) {
+      return true;
+    }
+  }
+  return false;
+}
+
+const string Compressions::getExtension(const string & codec) {
+  initCodecs();
+  for (size_t i = 0; i < SupportedCodecs.size(); i++) {
+    if (codec == SupportedCodecs[i].name) {
+      return SupportedCodecs[i].extension;
+    }
+  }
+  return string();
+}
+
+const string Compressions::getCodec(const string & extension) {
+  initCodecs();
+  for (size_t i = 0; i < SupportedCodecs.size(); i++) {
+    if (extension == SupportedCodecs[i].extension) {
+      return SupportedCodecs[i].name;
+    }
+  }
+  return string();
+}
+
+const string Compressions::getCodecByFile(const string & file) {
+  initCodecs();
+  for (size_t i = 0; i < SupportedCodecs.size(); i++) {
+    const string & extension = SupportedCodecs[i].extension;
+    if ((file.length() > extension.length())
+        && (file.substr(file.length() - extension.length()) == extension)) {
+      return SupportedCodecs[i].name;
+    }
+  }
+  return string();
+}
+
+CompressStream * Compressions::getCompressionStream(const string & codec, OutputStream * stream,
+    uint32_t bufferSizeHint) {
+  if (codec == GzipCodec.name) {
+    return new GzipCompressStream(stream, bufferSizeHint);
+  }
+  if (codec == SnappyCodec.name) {
+    return new SnappyCompressStream(stream, bufferSizeHint);
+  }
+  if (codec == Lz4Codec.name) {
+    return new Lz4CompressStream(stream, bufferSizeHint);
+  }
+  return NULL;
+}
+
+DecompressStream * Compressions::getDecompressionStream(const string & codec, InputStream * stream,
+    uint32_t bufferSizeHint) {
+  if (codec == GzipCodec.name) {
+    return new GzipDecompressStream(stream, bufferSizeHint);
+  }
+  if (codec == SnappyCodec.name) {
+    return new SnappyDecompressStream(stream, bufferSizeHint);
+  }
+  if (codec == Lz4Codec.name) {
+    return new Lz4DecompressStream(stream, bufferSizeHint);
+  }
+  return NULL;
+}
+
+} // namespace NativeTask
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMPRESSIONS_H_
+#define COMPRESSIONS_H_
+
+#include <string>
+#include <vector>
+#include "Streams.h"
+
+namespace NativeTask {
+
+using std::vector;
+using std::string;
+
+class CompressStream : public FilterOutputStream {
+public:
+  CompressStream(OutputStream * stream)
+      : FilterOutputStream(stream) {
+  }
+
+  virtual ~CompressStream();
+
+  virtual void writeDirect(const void * buff, uint32_t length);
+
+  virtual void finish() {
+    flush();
+  }
+
+  virtual void resetState() {
+
+  }
+
+  virtual uint64_t compressedBytesWritten() {
+    return 0;
+  }
+};
+
+class DecompressStream : public FilterInputStream {
+public:
+  DecompressStream(InputStream * stream)
+      : FilterInputStream(stream) {
+  }
+
+  virtual ~DecompressStream();
+
+  virtual int32_t readDirect(void * buff, uint32_t length);
+
+  virtual uint64_t compressedBytesRead() {
+    return 0;
+  }
+};
+
+class Compressions {
+protected:
+  class Codec {
+  public:
+    string name;
+    string extension;
+
+    Codec(const string & name, const string & extension)
+        : name(name), extension(extension) {
+    }
+  };
+
+  static vector<Codec> SupportedCodecs;
+
+  static void initCodecs();
+
+public:
+  static const Codec GzipCodec;
+  static const Codec SnappyCodec;
+  static const Codec Lz4Codec;
+
+public:
+  static bool support(const string & codec);
+
+  static const string getExtension(const string & codec);
+
+  static const string getCodec(const string & extension);
+
+  static const string getCodecByFile(const string & file);
+
+  static CompressStream * getCompressionStream(const string & codec, OutputStream * stream,
+      uint32_t bufferSizeHint);
+
+  static DecompressStream * getDecompressionStream(const string & codec, InputStream * stream,
+      uint32_t bufferSizeHint);
+};
+
+} // namespace NativeTask
+
+#endif /* COMPRESSIONS_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Constants.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Constants.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Constants.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Constants.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONSTANTS_H_
+#define CONSTANTS_H_
+
+const uint32_t SIZE_OF_PARTITION_LENGTH = sizeof(uint32_t);
+const uint32_t SIZE_OF_KEY_LENGTH = sizeof(uint32_t);
+const uint32_t SIZE_OF_VALUE_LENGTH = sizeof(uint32_t);
+const uint32_t SIZE_OF_KV_LENGTH = SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH;
+
+#endif //CONSTANTS_H_

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <errno.h>
+#include <dirent.h>
+#include <sys/stat.h>
+#include <jni.h>
+#include "commons.h"
+#include "util/StringUtil.h"
+#include "jniutils.h"
+#include "NativeTask.h"
+#include "TaskCounters.h"
+#include "NativeObjectFactory.h"
+#include "Path.h"
+#include "FileSystem.h"
+
+namespace NativeTask {
+
+/////////////////////////////////////////////////////////////
+
+FileInputStream::FileInputStream(const string & path) {
+  _handle = fopen(path.c_str(), "rb");
+  if (_handle != NULL) {
+    _fd = fileno(_handle);
+    _path = path;
+  } else {
+    _fd = -1;
+    THROW_EXCEPTION_EX(IOException, "Can't open raw file: [%s]", path.c_str());
+  }
+  _bytesRead = NativeObjectFactory::GetCounter(TaskCounters::FILESYSTEM_COUNTER_GROUP,
+      TaskCounters::FILE_BYTES_READ);
+}
+
+FileInputStream::~FileInputStream() {
+  close();
+}
+
+void FileInputStream::seek(uint64_t position) {
+  ::lseek(_fd, position, SEEK_SET);
+}
+
+uint64_t FileInputStream::tell() {
+  return ::lseek(_fd, 0, SEEK_CUR);
+}
+
+int32_t FileInputStream::read(void * buff, uint32_t length) {
+  int32_t ret = ::read(_fd, buff, length);
+  if (ret > 0) {
+    _bytesRead->increase(ret);
+  }
+  return ret;
+}
+
+void FileInputStream::close() {
+  if (_handle != NULL) {
+    fclose(_handle);
+    _handle = NULL;
+    _fd = -1;
+  }
+}
+
+/////////////////////////////////////////////////////////////
+
+FileOutputStream::FileOutputStream(const string & path, bool overwite) {
+  _handle = fopen(path.c_str(), "wb");
+  if (_handle != NULL) {
+    _fd = fileno(_handle);
+    _path = path;
+  } else {
+    _fd = -1;
+    THROW_EXCEPTION_EX(IOException, "Open raw file failed: [%s]", path.c_str());
+  }
+  _bytesWrite = NativeObjectFactory::GetCounter(TaskCounters::FILESYSTEM_COUNTER_GROUP,
+      TaskCounters::FILE_BYTES_WRITTEN);
+}
+
+FileOutputStream::~FileOutputStream() {
+  close();
+}
+
+uint64_t FileOutputStream::tell() {
+  return ::lseek(_fd, 0, SEEK_CUR);
+}
+
+void FileOutputStream::write(const void * buff, uint32_t length) {
+  if (::write(_fd, buff, length) < length) {
+    THROW_EXCEPTION(IOException, "::write error");
+  }
+  _bytesWrite->increase(length);
+}
+
+void FileOutputStream::flush() {
+}
+
+void FileOutputStream::close() {
+  if (_handle != NULL) {
+    fclose(_handle);
+    _handle = NULL;
+    _fd = -1;
+  }
+}
+
+/////////////////////////////////////////////////////////////
+
+class RawFileSystem : public FileSystem {
+protected:
+  string getRealPath(const string & path) {
+    if (StringUtil::StartsWith(path, "file:")) {
+      return path.substr(5);
+    }
+    return path;
+  }
+public:
+  InputStream * open(const string & path) {
+    return new FileInputStream(getRealPath(path));
+  }
+
+  OutputStream * create(const string & path, bool overwrite) {
+    string np = getRealPath(path);
+    string parent = Path::GetParent(np);
+    if (parent.length() > 0) {
+      if (!exists(parent)) {
+        mkdirs(parent);
+      }
+    }
+    return new FileOutputStream(np, overwrite);
+  }
+
+  uint64_t getLength(const string & path) {
+    struct stat st;
+    if (::stat(getRealPath(path).c_str(), &st) != 0) {
+      char buff[256];
+      strerror_r(errno, buff, 256);
+      THROW_EXCEPTION(IOException,
+          StringUtil::Format("stat path %s failed, %s", path.c_str(), buff));
+    }
+    return st.st_size;
+  }
+
+  bool list(const string & path, vector<FileEntry> & status) {
+    DIR * dp;
+    struct dirent * dirp;
+    if ((dp = opendir(path.c_str())) == NULL) {
+      return false;
+    }
+
+    FileEntry temp;
+    while ((dirp = readdir(dp)) != NULL) {
+      temp.name = dirp->d_name;
+      temp.isDirectory = dirp->d_type & DT_DIR;
+      if (temp.name == "." || temp.name == "..") {
+        continue;
+      }
+      status.push_back(temp);
+    }
+    closedir(dp);
+    return true;
+  }
+
+  void remove(const string & path) {
+    if (!exists(path)) {
+      LOG("[FileSystem] remove file %s not exists, ignore", path.c_str());
+      return;
+    }
+    if (::remove(getRealPath(path).c_str()) != 0) {
+      int err = errno;
+      if (::system(StringUtil::Format("rm -rf %s", path.c_str()).c_str()) == 0) {
+        return;
+      }
+      char buff[256];
+      strerror_r(err, buff, 256);
+      THROW_EXCEPTION(IOException,
+          StringUtil::Format("FileSystem: remove path %s failed, %s", path.c_str(), buff));
+    }
+  }
+
+  bool exists(const string & path) {
+    struct stat st;
+    if (::stat(getRealPath(path).c_str(), &st) != 0) {
+      return false;
+    }
+    return true;
+  }
+
+  int mkdirs(const string & path, mode_t nmode) {
+    string np = getRealPath(path);
+    struct stat sb;
+
+    if (stat(np.c_str(), &sb) == 0) {
+      if (S_ISDIR (sb.st_mode) == 0) {
+        return 1;
+      }
+      return 0;
+    }
+
+    string npathstr = np;
+    char * npath = const_cast<char*>(npathstr.c_str());
+
+    /* Skip leading slashes. */
+    char * p = npath;
+    while (*p == '/')
+      p++;
+
+    while (NULL != (p = strchr(p, '/'))) {
+      *p = '\0';
+      if (stat(npath, &sb) != 0) {
+        if (mkdir(npath, nmode)) {
+          return 1;
+        }
+      } else if (S_ISDIR (sb.st_mode) == 0) {
+        return 1;
+      }
+      *p++ = '/'; /* restore slash */
+      while (*p == '/')
+        p++;
+    }
+
+    /* Create the final directory component. */
+    if (stat(npath, &sb) && mkdir(npath, nmode)) {
+      return 1;
+    }
+    return 0;
+  }
+
+  void mkdirs(const string & path) {
+    int ret = mkdirs(path, 0755);
+    if (ret != 0) {
+      THROW_EXCEPTION_EX(IOException, "mkdirs [%s] failed", path.c_str());
+    }
+  }
+};
+
+///////////////////////////////////////////////////////////
+
+extern RawFileSystem RawFileSystemInstance;
+
+RawFileSystem RawFileSystemInstance = RawFileSystem();
+
+string FileSystem::getDefaultFsUri(Config * config) {
+  const char * nm = config->get(FS_DEFAULT_NAME);
+  if (nm == NULL) {
+    nm = config->get("fs.defaultFS");
+  }
+  if (nm == NULL) {
+    return string("file:///");
+  } else {
+    return string(nm);
+  }
+}
+
+FileSystem & FileSystem::getLocal() {
+  return RawFileSystemInstance;
+}
+
+
+FileSystem & FileSystem::get(Config * config) {
+  string uri = getDefaultFsUri(config);
+  if (uri == "file:///") {
+    return RawFileSystemInstance;
+  }
+}
+
+} // namespace Hadoap

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/FileSystem.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FILESYSTEM_H_
+#define FILESYSTEM_H_
+
+#include <string>
+#include "NativeTask.h"
+#include "Streams.h"
+
+namespace NativeTask {
+
+class FileSystem;
+
+/**
+ * Local raw filesystem file input stream
+ * with blocking semantics
+ */
+class FileInputStream : public InputStream {
+private:
+  string _path;
+  FILE * _handle;
+  int _fd;
+  Counter * _bytesRead;
+public:
+  FileInputStream(const string & path);
+  virtual ~FileInputStream();
+
+  virtual void seek(uint64_t position);
+
+  virtual uint64_t tell();
+
+  virtual int32_t read(void * buff, uint32_t length);
+
+  virtual void close();
+};
+
+/**
+ * Local raw filesystem file output stream
+ * with blocking semantics
+ */
+class FileOutputStream : public OutputStream {
+private:
+  string _path;
+  FILE * _handle;
+  int _fd;
+  Counter * _bytesWrite;
+public:
+  FileOutputStream(const string & path, bool overwite = true);
+  virtual ~FileOutputStream();
+
+  virtual uint64_t tell();
+
+  virtual void write(const void * buff, uint32_t length);
+
+  virtual void flush();
+
+  virtual void close();
+};
+
+
+class FileEntry {
+public:
+  string name;
+  bool isDirectory;
+};
+
+/**
+ * FileSystem interface
+ */
+class FileSystem {
+protected:
+  FileSystem() {
+  }
+public:
+  virtual ~FileSystem() {
+  }
+
+  virtual InputStream * open(const string & path) {
+    return NULL;
+  }
+
+  virtual OutputStream * create(const string & path, bool overwrite = true) {
+    return NULL;
+  }
+
+  virtual uint64_t getLength(const string & path) {
+    return 0;
+  }
+
+  virtual bool list(const string & path, vector<FileEntry> & status) {
+    return false;
+  }
+
+  virtual void remove(const string & path) {
+  }
+
+  virtual bool exists(const string & path) {
+    return false;
+  }
+
+  virtual void mkdirs(const string & path) {
+  }
+
+  static string getDefaultFsUri(Config * config);
+  static FileSystem & getLocal();
+  static FileSystem & getJava(Config * config);
+  static FileSystem & get(Config * config);
+};
+
+} // namespace NativeTask
+
+#endif /* FILESYSTEM_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/StringUtil.h"
+#include "IFile.h"
+#include "Compressions.h"
+#include "lib/FileSystem.h"
+
+namespace NativeTask {
+
+///////////////////////////////////////////////////////////
+
+IFileReader::IFileReader(InputStream * stream, SingleSpillInfo * spill, bool deleteInputStream)
+    : _deleteSourceStream(deleteInputStream), _stream(stream), _source(NULL),
+        _checksumType(spill->checkSumType), _kType(spill->keyType), _vType(spill->valueType),
+        _codec(spill->codec), _segmentIndex(-1), _spillInfo(spill), _valuePos(NULL), _valueLen(0) {
+  _source = new ChecksumInputStream(_stream, _checksumType);
+  _source->setLimit(0);
+  _reader.init(128 * 1024, _source, _codec);
+}
+
+IFileReader::~IFileReader() {
+
+  delete _source;
+  _source = NULL;
+
+  if (_deleteSourceStream) {
+    delete _stream;
+    _stream = NULL;
+  }
+}
+
+/**
+ * 0 if success
+ * 1 if end
+ */
+bool IFileReader::nextPartition() {
+  if (0 != _source->getLimit()) {
+    THROW_EXCEPTION(IOException, "bad ifile segment length");
+  }
+  if (_segmentIndex >= 0) {
+    // verify checksum
+    uint32_t chsum = 0;
+    if (4 != _stream->readFully(&chsum, 4)) {
+      THROW_EXCEPTION(IOException, "read ifile checksum failed");
+    }
+    uint32_t actual = bswap(chsum);
+    uint32_t expect = _source->getChecksum();
+    if (actual != expect) {
+      THROW_EXCEPTION_EX(IOException, "read ifile checksum not match, actual %x expect %x", actual,
+          expect);
+    }
+  }
+  _segmentIndex++;
+  if (_segmentIndex < (int)(_spillInfo->length)) {
+    int64_t end_pos = (int64_t)_spillInfo->segments[_segmentIndex].realEndOffset;
+    if (_segmentIndex > 0) {
+      end_pos -= (int64_t)_spillInfo->segments[_segmentIndex - 1].realEndOffset;
+    }
+    if (end_pos < 0) {
+      THROW_EXCEPTION(IOException, "bad ifile format");
+    }
+    // exclude checksum
+    _source->setLimit(end_pos - 4);
+    _source->resetChecksum();
+    return true;
+  } else {
+    return false;
+  }
+}
+
+///////////////////////////////////////////////////////////
+
+IFileWriter * IFileWriter::create(const std::string & filepath, const MapOutputSpec & spec,
+    Counter * spilledRecords) {
+  OutputStream * fout = FileSystem::getLocal().create(filepath, true);
+  IFileWriter * writer = new IFileWriter(fout, spec.checksumType, spec.keyType, spec.valueType,
+      spec.codec, spilledRecords, true);
+  return writer;
+}
+
+IFileWriter::IFileWriter(OutputStream * stream, ChecksumType checksumType, KeyValueType ktype,
+    KeyValueType vtype, const string & codec, Counter * counter, bool deleteTargetStream)
+    : _deleteTargetStream(deleteTargetStream), _stream(stream), _dest(NULL),
+        _checksumType(checksumType), _kType(ktype), _vType(vtype), _codec(codec),
+        _recordCounter(counter) {
+  _dest = new ChecksumOutputStream(_stream, _checksumType);
+  _appendBuffer.init(128 * 1024, _dest, _codec);
+}
+
+IFileWriter::~IFileWriter() {
+  delete _dest;
+  _dest = NULL;
+
+  if (_deleteTargetStream) {
+    delete _stream;
+    _stream = NULL;
+  }
+}
+
+void IFileWriter::startPartition() {
+  _spillFileSegments.push_back(IFileSegment());
+  _dest->resetChecksum();
+}
+
+void IFileWriter::endPartition() {
+  char EOFMarker[2] = {-1, -1};
+  _appendBuffer.write(EOFMarker, 2);
+  _appendBuffer.flush();
+
+  CompressStream * compressionStream = _appendBuffer.getCompressionStream();
+  if (NULL != compressionStream) {
+    compressionStream->finish();
+    compressionStream->resetState();
+  }
+
+  uint32_t chsum = _dest->getChecksum();
+  chsum = bswap(chsum);
+  _stream->write(&chsum, sizeof(chsum));
+  _stream->flush();
+  IFileSegment * info = &(_spillFileSegments[_spillFileSegments.size() - 1]);
+  info->uncompressedEndOffset = _appendBuffer.getCounter();
+  info->realEndOffset = _stream->tell();
+}
+
+void IFileWriter::write(const char * key, uint32_t keyLen, const char * value, uint32_t valueLen) {
+  // append KeyLength ValueLength KeyBytesLength
+  uint32_t keyBuffLen = keyLen;
+  uint32_t valBuffLen = valueLen;
+  switch (_kType) {
+  case TextType:
+    keyBuffLen += WritableUtils::GetVLongSize(keyLen);
+    break;
+  case BytesType:
+    keyBuffLen += 4;
+    break;
+  default:
+    break;
+  }
+
+  switch (_vType) {
+  case TextType:
+    valBuffLen += WritableUtils::GetVLongSize(valueLen);
+    break;
+  case BytesType:
+    valBuffLen += 4;
+    break;
+  default:
+    break;
+  }
+
+  _appendBuffer.write_vuint2(keyBuffLen, valBuffLen);
+
+  switch (_kType) {
+  case TextType:
+    _appendBuffer.write_vuint(keyLen);
+    break;
+  case BytesType:
+    _appendBuffer.write_uint32_be(keyLen);
+    break;
+  default:
+    break;
+  }
+
+  if (keyLen > 0) {
+    _appendBuffer.write(key, keyLen);
+  }
+
+  if (NULL != _recordCounter) {
+    _recordCounter->increase();
+  }
+
+  switch (_vType) {
+  case TextType:
+    _appendBuffer.write_vuint(valueLen);
+    break;
+  case BytesType:
+    _appendBuffer.write_uint32_be(valueLen);
+    break;
+  default:
+    break;
+  }
+  if (valueLen > 0) {
+    _appendBuffer.write(value, valueLen);
+  }
+}
+
+IFileSegment * IFileWriter::toArray(std::vector<IFileSegment> *segments) {
+  IFileSegment * segs = new IFileSegment[segments->size()];
+  for (size_t i = 0; i < segments->size(); i++) {
+    segs[i] = segments->at(i);
+  }
+  return segs;
+}
+
+SingleSpillInfo * IFileWriter::getSpillInfo() {
+  const uint32_t size = _spillFileSegments.size();
+  return new SingleSpillInfo(toArray(&_spillFileSegments), size, "", _checksumType, _kType, _vType,
+      _codec);
+}
+
+void IFileWriter::getStatistics(uint64_t & offset, uint64_t & realOffset) {
+  if (_spillFileSegments.size() > 0) {
+    offset = _spillFileSegments[_spillFileSegments.size() - 1].uncompressedEndOffset;
+    realOffset = _spillFileSegments[_spillFileSegments.size() - 1].realEndOffset;
+  } else {
+    offset = 0;
+    realOffset = 0;
+  }
+}
+
+} // namespace NativeTask
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/IFile.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef IFILE_H_
+#define IFILE_H_
+
+#include "Checksum.h"
+#include "Buffers.h"
+#include "WritableUtils.h"
+#include "SpillInfo.h"
+#include "MapOutputSpec.h"
+
+namespace NativeTask {
+
+/**
+ * IFileReader
+ */
+class IFileReader {
+private:
+  InputStream * _stream;
+  ChecksumInputStream * _source;
+  ReadBuffer _reader;
+  ChecksumType _checksumType;
+  KeyValueType _kType;
+  KeyValueType _vType;
+  string _codec;
+  int32_t _segmentIndex;
+  SingleSpillInfo * _spillInfo;
+  const char * _valuePos;
+  uint32_t _valueLen;
+  bool _deleteSourceStream;
+
+public:
+  IFileReader(InputStream * stream, SingleSpillInfo * spill, bool deleteSourceStream = false);
+
+  virtual ~IFileReader();
+
+  /**
+   * @return 0 if have next partition, none 0 if no more partition
+   */
+  bool nextPartition();
+
+  /**
+   * get next key
+   * NULL if no more, then next_partition() need to be called
+   * NOTICE: before value() is called, the return pointer value is
+   *         guaranteed to be valid
+   */
+  const char * nextKey(uint32_t & keyLen) {
+    int64_t t1 = _reader.readVLong();
+    int64_t t2 = _reader.readVLong();
+    if (t1 == -1) {
+      return NULL;
+    }
+    const char * kvbuff = _reader.get((uint32_t)(t1 + t2));
+    uint32_t len;
+    switch (_kType) {
+    case TextType:
+      keyLen = WritableUtils::ReadVInt(kvbuff, len);
+      break;
+    case BytesType:
+      keyLen = bswap(*(uint32_t*)kvbuff);
+      len = 4;
+      break;
+    default:
+      keyLen = t1;
+      len = 0;
+    }
+    const char * kbuff = kvbuff + len;
+    const char * vbuff = kvbuff + (uint32_t)t1;
+    switch (_vType) {
+    case TextType:
+      _valueLen = WritableUtils::ReadVInt(vbuff, len);
+      _valuePos = vbuff + len;
+      break;
+    case BytesType:
+      _valueLen = bswap(*(uint32_t*)vbuff);
+      _valuePos = vbuff + 4;
+      break;
+    default:
+      _valueLen = t2;
+      _valuePos = vbuff;
+    }
+    return kbuff;
+  }
+
+  /**
+   * length of current value part of IFile entry
+   */
+  uint32_t valueLen() {
+    return _valueLen;
+  }
+
+  /**
+   * get current value
+   */
+  const char * value(uint32_t & valueLen) {
+    valueLen = _valueLen;
+    return _valuePos;
+  }
+};
+
+/**
+ * IFile Writer
+ */
+class IFileWriter : public Collector {
+protected:
+  OutputStream * _stream;
+  ChecksumOutputStream * _dest;
+  ChecksumType _checksumType;
+  KeyValueType _kType;
+  KeyValueType _vType;
+  string _codec;
+  AppendBuffer _appendBuffer;
+  vector<IFileSegment> _spillFileSegments;
+  Counter * _recordCounter;
+
+  bool _deleteTargetStream;
+
+private:
+  IFileSegment * toArray(std::vector<IFileSegment> *segments);
+
+public:
+  static IFileWriter * create(const std::string & filepath, const MapOutputSpec & spec,
+      Counter * spilledRecords);
+
+  IFileWriter(OutputStream * stream, ChecksumType checksumType, KeyValueType ktype,
+      KeyValueType vtype, const string & codec, Counter * recordCounter,
+      bool deleteTargetStream = false);
+
+  virtual ~IFileWriter();
+
+  void startPartition();
+
+  void endPartition();
+
+  virtual void write(const char * key, uint32_t keyLen, const char * value, uint32_t valueLen);
+
+  SingleSpillInfo * getSpillInfo();
+
+  void getStatistics(uint64_t & offset, uint64_t & realOffset);
+
+  virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen) {
+    write((const char*)key, keyLen, (const char*)value, valueLen);
+  }
+};
+
+} // namespace NativeTask
+
+#endif /* IFILE_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Iterator.h"
+#include "commons.h"
+
+namespace NativeTask {
+
+KeyGroupIteratorImpl::KeyGroupIteratorImpl(KVIterator * iterator)
+    : _keyGroupIterState(NEW_KEY), _iterator(iterator), _first(true) {
+}
+
+bool KeyGroupIteratorImpl::nextKey() {
+  if (_keyGroupIterState == NO_MORE) {
+    return false;
+  }
+
+  uint32_t temp;
+  while (_keyGroupIterState == SAME_KEY || _keyGroupIterState == NEW_KEY_VALUE) {
+    nextValue(temp);
+  }
+  if (_keyGroupIterState == NEW_KEY) {
+    if (_first == true) {
+      _first = false;
+      if (!next()) {
+        _keyGroupIterState = NO_MORE;
+        return false;
+      }
+    }
+    _keyGroupIterState = NEW_KEY_VALUE;
+    _currentGroupKey.assign(_key.data(), _key.length());
+    return true;
+  }
+  return false;
+}
+
+const char * KeyGroupIteratorImpl::getKey(uint32_t & len) {
+  len = (uint32_t)_key.length();
+  return _key.data();
+}
+
+const char * KeyGroupIteratorImpl::nextValue(uint32_t & len) {
+  switch (_keyGroupIterState) {
+  case NEW_KEY: {
+    return NULL;
+  }
+  case SAME_KEY: {
+    if (next()) {
+      if (_key.length() == _currentGroupKey.length()) {
+        if (fmemeq(_key.data(), _currentGroupKey.c_str(), _key.length())) {
+          len = _value.length();
+          return _value.data();
+        }
+      }
+      _keyGroupIterState = NEW_KEY;
+      return NULL;
+    }
+    _keyGroupIterState = NO_MORE;
+    return NULL;
+  }
+  case NEW_KEY_VALUE: {
+    _keyGroupIterState = SAME_KEY;
+    len = _value.length();
+    return _value.data();
+  }
+  case NO_MORE:
+    return NULL;
+  }
+  return NULL;
+}
+
+bool KeyGroupIteratorImpl::next() {
+  bool result = _iterator->next(_key, _value);
+  return result;
+}
+
+} //namespace NativeTask

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Iterator.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ITERATOR_H_
+#define ITERATOR_H_
+
+#include "NativeTask.h"
+
+namespace NativeTask {
+
+class KeyGroupIteratorImpl : public KeyGroupIterator {
+protected:
+  // for KeyGroupIterator
+  KeyGroupIterState _keyGroupIterState;
+  KVIterator * _iterator;
+  string _currentGroupKey;
+  Buffer _key;
+  Buffer _value;
+  bool _first;
+
+public:
+  KeyGroupIteratorImpl(KVIterator * iterator);
+  bool nextKey();
+  const char * getKey(uint32_t & len);
+  const char * nextValue(uint32_t & len);
+
+protected:
+  bool next();
+};
+
+} //namespace NativeTask
+#endif

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Log.h"
+
+namespace NativeTask {
+
+#ifdef PRINT_LOG
+
+FILE * LOG_DEVICE = stderr;
+
+#endif
+
+} //namespace NativeTask
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Log.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LOG_H_
+#define LOG_H_
+
+#include <stdio.h>
+#include <time.h>
+
+namespace NativeTask {
+
+#define PRINT_LOG
+
+#ifdef PRINT_LOG
+
+extern FILE * LOG_DEVICE;
+#define LOG(_fmt_, args...)   if (LOG_DEVICE) { \
+    time_t log_timer; struct tm log_tm; \
+    time(&log_timer); localtime_r(&log_timer, &log_tm); \
+    fprintf(LOG_DEVICE, "%02d/%02d/%02d %02d:%02d:%02d INFO "_fmt_"\n", \
+    log_tm.tm_year%100, log_tm.tm_mon+1, log_tm.tm_mday, \
+    log_tm.tm_hour, log_tm.tm_min, log_tm.tm_sec, \
+    ##args);}
+
+#else
+
+#define LOG(_fmt_, args...)
+
+#endif
+
+} // namespace NativeTask
+
+#endif /* LOG_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/Timer.h"
+#include "util/StringUtil.h"
+#include "FileSystem.h"
+#include "NativeObjectFactory.h"
+#include "MapOutputCollector.h"
+#include "Merge.h"
+#include "NativeTask.h"
+#include "WritableUtils.h"
+#include "util/DualPivotQuickSort.h"
+#include "Combiner.h"
+#include "TaskCounters.h"
+#include "MinHeap.h"
+
+namespace NativeTask {
+
+ICombineRunner * CombineRunnerWrapper::createCombiner() {
+
+  ICombineRunner * combineRunner = NULL;
+  if (NULL != _config->get(NATIVE_COMBINER)) {
+    const char * combinerClass = _config->get(NATIVE_COMBINER);
+    ObjectCreatorFunc objectCreater = NativeObjectFactory::GetObjectCreator(combinerClass);
+    if (NULL == objectCreater) {
+      THROW_EXCEPTION_EX(UnsupportException, "Combiner not found: %s", combinerClass);
+    } else {
+      LOG("[MapOutputCollector::configure] native combiner is enabled: %s", combinerClass);
+    }
+    combineRunner = new NativeCombineRunner(_config, objectCreater);
+  } else {
+    CombineHandler * javaCombiner = _spillOutput->getJavaCombineHandler();
+    if (NULL != javaCombiner) {
+      _isJavaCombiner = true;
+      combineRunner = (ICombineRunner *)javaCombiner;
+    } else {
+      LOG("[MapOutputCollector::getCombiner] cannot get combine handler from java");
+    }
+  }
+  return combineRunner;
+}
+
+void CombineRunnerWrapper::combine(CombineContext type, KVIterator * kvIterator,
+    IFileWriter * writer) {
+
+  if (!_combinerInited) {
+    _combineRunner = createCombiner();
+    _combinerInited = true;
+  }
+
+  if (NULL != _combineRunner) {
+    _combineRunner->combine(type, kvIterator, writer);
+  } else {
+    LOG("[CombineRunnerWrapper::combine] no valid combiner");
+  }
+}
+
+/////////////////////////////////////////////////////////////////
+// MapOutputCollector
+/////////////////////////////////////////////////////////////////
+
+MapOutputCollector::MapOutputCollector(uint32_t numberPartitions, SpillOutputService * spillService)
+    : _config(NULL), _buckets(NULL), _keyComparator(NULL), _defaultBlockSize(0),
+        _combineRunner(NULL), _spilledRecords(NULL), _spillOutput(spillService), _pool(NULL),
+        _numPartitions(numberPartitions) {
+  _pool = new MemoryPool();
+}
+
+MapOutputCollector::~MapOutputCollector() {
+
+  if (NULL != _buckets) {
+    for (uint32_t i = 0; i < _numPartitions; i++) {
+      if (NULL != _buckets[i]) {
+        delete _buckets[i];
+        _buckets[i] = NULL;
+      }
+    }
+  }
+
+  delete[] _buckets;
+  _buckets = NULL;
+
+  if (NULL != _pool) {
+    delete _pool;
+    _pool = NULL;
+  }
+
+  if (NULL != _combineRunner) {
+    delete _combineRunner;
+    _combineRunner = NULL;
+  }
+}
+
+void MapOutputCollector::init(uint32_t defaultBlockSize, uint32_t memoryCapacity,
+    ComparatorPtr keyComparator, Counter * spilledRecords, ICombineRunner * combiner) {
+
+  this->_combineRunner = combiner;
+
+  this->_defaultBlockSize = defaultBlockSize;
+
+  _pool->init(memoryCapacity);
+
+  //TODO: add support for customized comparator
+  this->_keyComparator = keyComparator;
+
+  _buckets = new PartitionBucket*[_numPartitions];
+
+  for (uint32_t partitionId = 0; partitionId < _numPartitions; partitionId++) {
+    PartitionBucket * pb = new PartitionBucket(_pool, partitionId, keyComparator, _combineRunner,
+        defaultBlockSize);
+
+    _buckets[partitionId] = pb;
+  }
+
+  _spilledRecords = spilledRecords;
+
+  _collectTimer.reset();
+}
+
+void MapOutputCollector::reset() {
+  for (uint32_t i = 0; i < _numPartitions; i++) {
+    if (NULL != _buckets[i]) {
+      _buckets[i]->reset();
+    }
+  }
+  _pool->reset();
+}
+
+void MapOutputCollector::configure(Config * config) {
+  _config = config;
+  MapOutputSpec::getSpecFromConfig(config, _spec);
+
+  uint32_t maxBlockSize = config->getInt(NATIVE_SORT_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE);
+  uint32_t capacity = config->getInt(MAPRED_IO_SORT_MB, 300) * 1024 * 1024;
+
+  uint32_t defaultBlockSize = getDefaultBlockSize(capacity, _numPartitions, maxBlockSize);
+  LOG("Native Total MemoryBlockPool: num_partitions %u, min_block_size %uK, max_block_size %uK, capacity %uM", _numPartitions, defaultBlockSize / 1024,
+      maxBlockSize / 1024, capacity / 1024 / 1024);
+
+  ComparatorPtr comparator = getComparator(config, _spec);
+
+  Counter * spilledRecord = NativeObjectFactory::GetCounter(TaskCounters::TASK_COUNTER_GROUP,
+      TaskCounters::SPILLED_RECORDS);
+
+  ICombineRunner * combiner = NULL;
+  if (NULL != config->get(NATIVE_COMBINER)
+      // config name for old api and new api
+      || NULL != config->get(MAPRED_COMBINE_CLASS_OLD)
+      || NULL != config->get(MAPRED_COMBINE_CLASS_NEW)) {
+    combiner = new CombineRunnerWrapper(config, _spillOutput);
+  }
+
+  init(defaultBlockSize, capacity, comparator, spilledRecord, combiner);
+}
+
+KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t partitionId, uint32_t kvlength) {
+  PartitionBucket * partition = getPartition(partitionId);
+  if (NULL == partition) {
+    THROW_EXCEPTION_EX(IOException, "Partition is NULL, partition_id: %d, num_partitions: %d", partitionId, _numPartitions);
+  }
+
+  KVBuffer * dest = partition->allocateKVBuffer(kvlength);
+
+  if (NULL == dest) {
+    string * spillpath = _spillOutput->getSpillPath();
+    if (NULL == spillpath || spillpath->length() == 0) {
+      THROW_EXCEPTION(IOException, "Illegal(empty) spill files path");
+    } else {
+      middleSpill(*spillpath, "");
+      delete spillpath;
+    }
+
+    dest = dest = partition->allocateKVBuffer(kvlength);
+    if (NULL == dest) {
+      // io.sort.mb too small, cann't proceed
+      // should not get here, cause get_buffer_to_put can throw OOM exception
+      THROW_EXCEPTION(OutOfMemoryException, "key/value pair larger than io.sort.mb");
+    }
+  }
+  return dest;
+}
+
+/**
+ * collect one k/v pair
+ * @return true success; false buffer full, need spill
+ */
+bool MapOutputCollector::collect(const void * key, uint32_t keylen, const void * value,
+    uint32_t vallen, uint32_t partitionId) {
+  uint32_t total_length = keylen + vallen + KVBuffer::headerLength();
+  KVBuffer * buff = allocateKVBuffer(partitionId, total_length);
+
+  if (NULL == buff) {
+    return false;
+  }
+  buff->fill(key, keylen, value, vallen);
+  return true;
+}
+
+ComparatorPtr MapOutputCollector::getComparator(Config * config, MapOutputSpec & spec) {
+  string nativeComparator = NATIVE_MAPOUT_KEY_COMPARATOR;
+  const char * key_class = config->get(MAPRED_MAPOUTPUT_KEY_CLASS);
+  if (NULL == key_class) {
+    key_class = config->get(MAPRED_OUTPUT_KEY_CLASS);
+  }
+  nativeComparator.append(".").append(key_class);
+  const char * comparatorName = config->get(nativeComparator);
+  return NativeTask::get_comparator(spec.keyType, comparatorName);
+}
+
+PartitionBucket * MapOutputCollector::getPartition(uint32_t partition) {
+  if (partition >= _numPartitions) {
+    return NULL;
+  }
+  return _buckets[partition];
+}
+
+/**
+ * Spill buffer to file
+ * @return Array of spill segments information
+ */
+void MapOutputCollector::sortPartitions(SortOrder orderType, SortAlgorithm sortType,
+    IFileWriter * writer, SortMetrics & metric) {
+
+  uint32_t start_partition = 0;
+  uint32_t num_partition = _numPartitions;
+  if (orderType == GROUPBY) {
+    THROW_EXCEPTION(UnsupportException, "GROUPBY not supported");
+  }
+
+  uint64_t sortingTime = 0;
+  Timer timer;
+  uint64_t recordNum = 0;
+
+  for (uint32_t i = 0; i < num_partition; i++) {
+    if (NULL != writer) {
+      writer->startPartition();
+    }
+    PartitionBucket * pb = _buckets[start_partition + i];
+    if (pb != NULL) {
+      recordNum += pb->getKVCount();
+      if (orderType == FULLORDER) {
+        timer.reset();
+        pb->sort(sortType);
+        sortingTime += timer.now() - timer.last();
+      }
+      if (NULL != writer) {
+        pb->spill(writer);
+      }
+    }
+    if (NULL != writer) {
+      writer->endPartition();
+    }
+  }
+  metric.sortTime = sortingTime;
+  metric.recordCount = recordNum;
+}
+
+void MapOutputCollector::middleSpill(const std::string & spillOutput,
+    const std::string & indexFilePath) {
+
+  uint64_t collecttime = _collectTimer.now() - _collectTimer.last();
+  const uint64_t M = 1000000; //million
+
+  if (spillOutput.empty()) {
+    THROW_EXCEPTION(IOException, "MapOutputCollector: Spill file path empty");
+  } else {
+    OutputStream * fout = FileSystem::getLocal().create(spillOutput, true);
+
+    IFileWriter * writer = new IFileWriter(fout, _spec.checksumType, _spec.keyType, _spec.valueType,
+        _spec.codec, _spilledRecords);
+
+    Timer timer;
+    SortMetrics metrics;
+    sortPartitions(_spec.sortOrder, _spec.sortAlgorithm, writer, metrics);
+
+    SingleSpillInfo * info = writer->getSpillInfo();
+    info->path = spillOutput;
+    uint64_t spillTime = timer.now() - timer.last() - metrics.sortTime;
+
+    LOG(
+        "[MapOutputCollector::mid_spill] Sort and spill: {spilled file path: %s, id: %d, collect: %llu ms, sort: %llu ms, spill: %llu ms, records: %llu, uncompressed total bytes: %llu, compressed total bytes: %llu}",
+        info->path.c_str(), _spillInfos.getSpillCount(), collecttime / M, metrics.sortTime  / M, spillTime  / M,
+        metrics.recordCount, info->getEndPosition(), info->getRealEndPosition());
+
+    if (indexFilePath.length() > 0) {
+      info->writeSpillInfo(indexFilePath);
+      delete info;
+    } else {
+      _spillInfos.add(info);
+    }
+
+    delete writer;
+    delete fout;
+
+    reset();
+    _collectTimer.reset();
+  }
+}
+
+/**
+ * final merge and/or spill, use previous spilled
+ * file & in-memory data
+ */
+void MapOutputCollector::finalSpill(const std::string & filepath,
+    const std::string & idx_file_path) {
+
+  const uint64_t M = 1000000; //million
+  LOG("[MapOutputCollector::final_merge_and_spill] Spilling file path: %s", filepath.c_str());
+
+  if (_spillInfos.getSpillCount() == 0) {
+    middleSpill(filepath, idx_file_path);
+    return;
+  }
+
+  IFileWriter * writer = IFileWriter::create(filepath, _spec, _spilledRecords);
+  Merger * merger = new Merger(writer, _config, _keyComparator, _combineRunner);
+
+  for (size_t i = 0; i < _spillInfos.getSpillCount(); i++) {
+    SingleSpillInfo * spill = _spillInfos.getSingleSpillInfo(i);
+    MergeEntryPtr pme = IFileMergeEntry::create(spill);
+    merger->addMergeEntry(pme);
+  }
+
+  SortMetrics metrics;
+  sortPartitions(_spec.sortOrder, _spec.sortAlgorithm, NULL, metrics);
+  LOG("[MapOutputCollector::mid_spill] Sort final in memory kvs: {sort: %llu ms, records: %llu}",
+      metrics.sortTime / M, metrics.recordCount);
+
+  merger->addMergeEntry(new MemoryMergeEntry(_buckets, _numPartitions));
+
+  Timer timer;
+  merger->merge();
+  LOG(
+      "[MapOutputCollector::final_merge_and_spill]  Merge and Spill:{spilled file id: %d, merge and spill time: %llu ms}",
+      _spillInfos.getSpillCount(), (timer.now() - timer.last()) / M);
+
+  delete merger;
+
+  // write index
+  SingleSpillInfo * spill_range = writer->getSpillInfo();
+  spill_range->writeSpillInfo(idx_file_path);
+  delete spill_range;
+  _spillInfos.deleteAllSpillFiles();
+  delete writer;
+  reset();
+}
+
+void MapOutputCollector::close() {
+  string * outputpath = _spillOutput->getOutputPath();
+  string * indexpath = _spillOutput->getOutputIndexPath();
+
+  if ((outputpath->length() == 0) || (indexpath->length() == 0)) {
+    THROW_EXCEPTION(IOException, "Illegal(empty) map output file/index path");
+  }
+
+  finalSpill(*outputpath, *indexpath);
+
+  delete outputpath;
+  delete indexpath;
+}
+} // namespace NativeTask
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputCollector.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MAP_OUTPUT_COLLECTOR_H_
+#define MAP_OUTPUT_COLLECTOR_H_
+
+#include "NativeTask.h"
+#include "MemoryPool.h"
+#include "Timer.h"
+#include "Buffers.h"
+#include "MapOutputSpec.h"
+#include "IFile.h"
+#include "SpillInfo.h"
+#include "Combiner.h"
+#include "PartitionBucket.h"
+#include "lib/SpillOutputService.h"
+
+namespace NativeTask {
+/**
+ * MapOutputCollector
+ */
+
+struct SortMetrics {
+  uint64_t recordCount;
+  uint64_t sortTime;
+
+public:
+  SortMetrics()
+      : recordCount(0), sortTime(0) {
+  }
+};
+
+class CombineRunnerWrapper : public ICombineRunner {
+private:
+  Config * _config;
+  ICombineRunner * _combineRunner;
+  bool _isJavaCombiner;
+  bool _combinerInited;
+  SpillOutputService * _spillOutput;
+
+public:
+  CombineRunnerWrapper(Config * config, SpillOutputService * service)
+      : _spillOutput(service), _config(config), _isJavaCombiner(false), _combineRunner(NULL),
+          _combinerInited(false) {
+  }
+
+  ~CombineRunnerWrapper() {
+    if (!_isJavaCombiner) {
+      delete _combineRunner;
+    }
+  }
+
+  virtual void combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer);
+
+private:
+  ICombineRunner * createCombiner();
+};
+
+class MapOutputCollector {
+  static const uint32_t DEFAULT_MIN_BLOCK_SIZE = 16 * 1024;
+  static const uint32_t DEFAULT_MAX_BLOCK_SIZE = 4 * 1024 * 1024;
+
+private:
+  Config * _config;
+
+  uint32_t _numPartitions;
+  PartitionBucket ** _buckets;
+
+  ComparatorPtr _keyComparator;
+
+  ICombineRunner * _combineRunner;
+
+  Counter * _spilledRecords;
+  SpillOutputService * _spillOutput;
+
+  uint32_t _defaultBlockSize;
+
+  SpillInfos _spillInfos;
+
+  MapOutputSpec _spec;
+
+  Timer _collectTimer;
+
+  MemoryPool * _pool;
+
+public:
+  MapOutputCollector(uint32_t num_partition, SpillOutputService * spillService);
+
+  ~MapOutputCollector();
+
+  void configure(Config * config);
+
+  /**
+   * collect one k/v pair
+   * @return true success; false buffer full, need spill
+   */
+  bool collect(const void * key, uint32_t keylen, const void * value, uint32_t vallen,
+      uint32_t partitionId);
+
+  KVBuffer * allocateKVBuffer(uint32_t partitionId, uint32_t kvlength);
+
+  void close();
+
+private:
+  void init(uint32_t maxBlockSize, uint32_t memory_capacity, ComparatorPtr keyComparator,
+      Counter * spilledRecord, ICombineRunner * combiner);
+
+  void reset();
+
+  /**
+   * spill a range of partition buckets, prepare for future
+   * Parallel sort & spill, TODO: parallel sort & spill
+   */
+  void sortPartitions(SortOrder orderType, SortAlgorithm sortType, IFileWriter * writer,
+      SortMetrics & metrics);
+
+  ComparatorPtr getComparator(Config * config, MapOutputSpec & spec);
+
+  inline uint32_t GetCeil(uint32_t v, uint32_t unit) {
+    return ((v + unit - 1) / unit) * unit;
+  }
+
+  uint32_t getDefaultBlockSize(uint32_t memoryCapacity, uint32_t partitionNum,
+      uint32_t maxBlockSize) {
+    uint32_t defaultBlockSize = memoryCapacity / _numPartitions / 4;
+    defaultBlockSize = GetCeil(defaultBlockSize, DEFAULT_MIN_BLOCK_SIZE);
+    defaultBlockSize = std::min(defaultBlockSize, maxBlockSize);
+    return defaultBlockSize;
+  }
+
+  PartitionBucket * getPartition(uint32_t partition);
+
+  /**
+   * normal spill use options in _config
+   * @param filepaths: spill file path
+   */
+  void middleSpill(const std::string & spillOutput, const std::string & indexFilePath);
+
+  /**
+   * final merge and/or spill use options in _config, and
+   * previous spilled file & in-memory data
+   */
+  void finalSpill(const std::string & filepath, const std::string & indexpath);
+};
+
+} //namespace NativeTask
+
+#endif /* MAP_OUTPUT_COLLECTOR_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "MapOutputSpec.h"
+#include "NativeTask.h"
+
+namespace NativeTask {
+
+void MapOutputSpec::getSpecFromConfig(Config * config, MapOutputSpec & spec) {
+  if (NULL == config) {
+    return;
+  }
+  spec.checksumType = CHECKSUM_CRC32;
+  string sortType = config->get(NATIVE_SORT_TYPE, "DUALPIVOTSORT");
+  if (sortType == "DUALPIVOTSORT") {
+    spec.sortAlgorithm = DUALPIVOTSORT;
+  } else {
+    spec.sortAlgorithm = CPPSORT;
+  }
+  if (config->get(MAPRED_COMPRESS_MAP_OUTPUT, "false") == "true") {
+    spec.codec = config->get(MAPRED_MAP_OUTPUT_COMPRESSION_CODEC);
+  } else {
+    spec.codec = "";
+  }
+  if (config->getBool(MAPRED_SORT_AVOID, false)) {
+    spec.sortOrder = NOSORT;
+  } else {
+    spec.sortOrder = FULLORDER;
+  }
+  const char * key_class = config->get(MAPRED_MAPOUTPUT_KEY_CLASS);
+  if (NULL == key_class) {
+    key_class = config->get(MAPRED_OUTPUT_KEY_CLASS);
+  }
+  if (NULL == key_class) {
+    THROW_EXCEPTION(IOException, "mapred.mapoutput.key.class not set");
+  }
+  spec.keyType = JavaClassToKeyValueType(key_class);
+  const char * value_class = config->get(MAPRED_MAPOUTPUT_VALUE_CLASS);
+  if (NULL == value_class) {
+    value_class = config->get(MAPRED_OUTPUT_VALUE_CLASS);
+  }
+  if (NULL == value_class) {
+    THROW_EXCEPTION(IOException, "mapred.mapoutput.value.class not set");
+  }
+  spec.valueType = JavaClassToKeyValueType(value_class);
+}
+
+} // namespace NativeTask

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MapOutputSpec.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MAPOUTPUTSPEC_H_
+#define MAPOUTPUTSPEC_H_
+
+#include <string>
+#include "Checksum.h"
+#include "WritableUtils.h"
+#include "NativeTask.h"
+
+namespace NativeTask {
+
+using std::string;
+
+/**
+ * internal sort method
+ */
+enum SortAlgorithm {
+  CQSORT = 0,
+  CPPSORT = 1,
+  DUALPIVOTSORT = 2,
+};
+
+/**
+ * spill file type
+ * INTERMEDIATE: a simple key/value sequence file
+ * IFILE: classic hadoop IFile
+ */
+enum OutputFileType {
+  INTERMEDIATE = 0,
+  IFILE = 1,
+};
+
+/**
+ * key/value recored order requirements
+ * FULLSORT: hadoop  standard
+ * GROUPBY:  same key are grouped together, but not in order
+ * NOSORT:   no order at all
+ */
+enum SortOrder {
+  FULLORDER = 0,
+  GROUPBY = 1,
+  NOSORT = 2,
+};
+
+enum CompressionType {
+  PLAIN = 0,
+  SNAPPY = 1,
+};
+
+class MapOutputSpec {
+public:
+  KeyValueType keyType;
+  KeyValueType valueType;
+  SortOrder sortOrder;
+  SortAlgorithm sortAlgorithm;
+  string codec;
+  ChecksumType checksumType;
+
+  static void getSpecFromConfig(Config * config, MapOutputSpec & spec);
+};
+
+} // namespace NativeTask
+
+#endif /* MAPOUTPUTSPEC_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "NativeTask.h"
+#include "commons.h"
+#include "Timer.h"
+#include "Buffers.h"
+#include "MapOutputSpec.h"
+#include "IFile.h"
+#include "SpillInfo.h"
+#include "Combiner.h"
+#include "MapOutputSpec.h"
+
+#include "MemoryBlock.h"
+#include "MemoryPool.h"
+#include "util/DualPivotQuickSort.h"
+
+namespace NativeTask {
+
+class MemoryPool;
+
+MemoryBlock::MemoryBlock(char * pos, uint32_t size)
+    : _base(pos), _size(size), _position(0), _sorted(false) {
+}
+
+KVBuffer * MemoryBlock::getKVBuffer(int index) {
+  if (index < 0 || index >= _kvOffsets.size()) {
+    return NULL;
+  }
+  uint32_t offset = _kvOffsets.at(index);
+  KVBuffer * kvbuffer = (KVBuffer*)(_base + offset);
+  return kvbuffer;
+}
+
+void MemoryBlock::sort(SortAlgorithm type, ComparatorPtr comparator) {
+  if ((!_sorted) && (_kvOffsets.size() > 1)) {
+    switch (type) {
+    case CPPSORT:
+      std::sort(_kvOffsets.begin(), _kvOffsets.end(), ComparatorForStdSort(_base, comparator));
+      break;
+    case DUALPIVOTSORT: {
+      DualPivotQuicksort(_kvOffsets, ComparatorForDualPivotSort(_base, comparator));
+    }
+      break;
+    default:
+      THROW_EXCEPTION(UnsupportException, "Sort Algorithm not support");
+    }
+  }
+  _sorted = true;
+}
+} //namespace NativeTask

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryBlock.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "commons.h"
+
+#ifndef MEMORYBLOCK_H_
+#define MEMORYBLOCK_H_
+
+namespace NativeTask {
+
+class MemoryPool;
+
+class ComparatorForDualPivotSort {
+private:
+  const char * _base;
+  ComparatorPtr _keyComparator;
+public:
+  ComparatorForDualPivotSort(const char * base, ComparatorPtr comparator)
+      : _base(base), _keyComparator(comparator) {
+  }
+
+  inline int operator()(uint32_t lhs, uint32_t rhs) {
+    KVBuffer * left = (KVBuffer *)(_base + lhs);
+    KVBuffer * right = (KVBuffer *)(_base + rhs);
+    return (*_keyComparator)(left->content, left->keyLength, right->content, right->keyLength);
+  }
+};
+
+class ComparatorForStdSort {
+private:
+  const char * _base;
+  ComparatorPtr _keyComparator;
+public:
+  ComparatorForStdSort(const char * base, ComparatorPtr comparator)
+      : _base(base), _keyComparator(comparator) {
+  }
+
+public:
+  inline bool operator()(uint32_t lhs, uint32_t rhs) {
+    KVBuffer * left = (KVBuffer *)(_base + lhs);
+    KVBuffer * right = (KVBuffer *)(_base + rhs);
+    int ret = (*_keyComparator)(left->getKey(), left->keyLength, right->getKey(), right->keyLength);
+    return ret < 0;
+  }
+};
+
+class MemoryBlock {
+private:
+  char * _base;
+  uint32_t _size;
+  uint32_t _position;
+  std::vector<uint32_t> _kvOffsets;
+  bool _sorted;
+
+public:
+  MemoryBlock(char * pos, uint32_t size);
+
+  bool sorted() {
+    return _sorted;
+  }
+
+  KVBuffer * allocateKVBuffer(uint32_t length) {
+    if (length > remainSpace()) {
+      LOG("Unable to allocate kv from memory buffer, length: %d, remain: %d", length, remainSpace());
+      return NULL;
+    }
+    _sorted = false;
+    _kvOffsets.push_back(_position);
+    char * space = _base + _position;
+    _position += length;
+    return (KVBuffer *)space;
+  }
+
+  uint32_t remainSpace() const {
+    return _size - _position;
+  }
+
+  uint32_t getKVCount() {
+    return _kvOffsets.size();
+  }
+
+  KVBuffer * getKVBuffer(int index);
+
+  void sort(SortAlgorithm type, ComparatorPtr comparator);
+};
+//class MemoryBlock
+
+class MemBlockIterator {
+private:
+  MemoryBlock * _memBlock;
+  uint32_t _end;
+  uint32_t _current;
+  KVBuffer * _kvBuffer;
+
+public:
+
+  MemBlockIterator(MemoryBlock * memBlock)
+      : _memBlock(memBlock), _end(0), _current(0), _kvBuffer(NULL) {
+    _end = memBlock->getKVCount();
+  }
+
+  KVBuffer * getKVBuffer() {
+    return _kvBuffer;
+  }
+
+  /**
+   * move to next key/value
+   * 0 on success
+   * 1 on no more
+   */
+  bool next() {
+    if (_current >= _end) {
+      return false;
+    }
+    this->_kvBuffer = _memBlock->getKVBuffer(_current);
+    ++_current;
+    return true;
+  }
+};
+//class MemoryBlockIterator
+
+typedef MemBlockIterator * MemBlockIteratorPtr;
+
+class MemBlockComparator {
+private:
+  ComparatorPtr _keyComparator;
+
+public:
+  MemBlockComparator(ComparatorPtr comparator)
+      : _keyComparator(comparator) {
+  }
+
+public:
+  bool operator()(const MemBlockIteratorPtr lhs, const MemBlockIteratorPtr rhs) {
+
+    KVBuffer * left = lhs->getKVBuffer();
+    KVBuffer * right = rhs->getKVBuffer();
+
+    //Treat NULL as infinite MAX, so that we can pop out next value
+    if (NULL == left) {
+      return false;
+    }
+
+    if (NULL == right) {
+      return true;
+    }
+
+    return (*_keyComparator)(left->content, left->keyLength, right->content, right->keyLength) < 0;
+  }
+};
+
+} //namespace NativeTask
+
+#endif /* MEMORYBLOCK_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryPool.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryPool.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryPool.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MemoryPool.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MEMORYPOOL_H_
+#define MEMORYPOOL_H_
+
+#include "Buffers.h"
+#include "MapOutputSpec.h"
+#include "NativeTask.h"
+#include "util/StringUtil.h"
+
+namespace NativeTask {
+
+/**
+ * Class for allocating memory buffer
+ */
+
+class MemoryPool {
+private:
+  char * _base;
+  uint32_t _capacity;
+  uint32_t _used;
+
+public:
+
+  MemoryPool()
+      : _base(NULL), _capacity(0), _used(0) {
+  }
+
+  ~MemoryPool() {
+    if (NULL != _base) {
+      free(_base);
+      _base = NULL;
+    }
+  }
+
+  void init(uint32_t capacity) throw (OutOfMemoryException) {
+    if (capacity > _capacity) {
+      if (NULL != _base) {
+        free(_base);
+        _base = NULL;
+      }
+      _base = (char*)malloc(capacity);
+      if (NULL == _base) {
+        THROW_EXCEPTION(OutOfMemoryException, "Not enough memory to init MemoryBlockPool");
+      }
+      _capacity = capacity;
+    }
+    reset();
+  }
+
+  void reset() {
+    _used = 0;
+  }
+
+  char * allocate(uint32_t min, uint32_t expect, uint32_t & allocated) {
+    if (_used + min > _capacity) {
+      return NULL;
+    } else if (_used + expect > _capacity) {
+      char * buff = _base + _used;
+      allocated = min;
+      _used += min;
+      return buff;
+    } else {
+      char * buff = _base + _used;
+      allocated = expect;
+      _used += expect;
+      return buff;
+    }
+  }
+};
+
+} // namespace NativeTask
+
+#endif /* MEMORYPOOL_H_ */