You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/17 19:45:01 UTC

svn commit: r1611413 [16/18] - in /hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nati...

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdarg.h>
+#include "commons.h"
+#include "util/Random.h"
+#include "FileSystem.h"
+#include "test_commons.h"
+
+Config TestConfig = Config();
+
+const char * GenerateSeed = "generate.seed";
+const char * GenerateChoice = "generate.choice";
+const char * GenerateLen = "generate.len";
+const char * GenerateKeyLen = "generate.key.len";
+const char * GenerateValueLen = "generate.value.len";
+const char * GenerateRange = "generate.range";
+const char * GenerateKeyRange = "generate.key.range";
+const char * GenerateValueRange = "generate.value.range";
+
+vector<string> & MakeStringArray(vector<string> & dest, ...) {
+  va_list al;
+  va_start(al, dest);
+  while (true) {
+    const char * s = va_arg(al, const char *);
+    if (s == NULL) {
+      break;
+    }
+    dest.push_back(s);
+  }
+  va_end(al);
+  return dest;
+}
+
+GenerateType GetGenerateType(const string & type) {
+  if (type == "word") {
+    return GenWord;
+  } else if (type == "number") {
+    return GenNumber;
+  } else if (type == "bytes") {
+    return GenBytes;
+  } else {
+    THROW_EXCEPTION(UnsupportException, "GenerateType not support");
+  }
+}
+
+string & GenerateOne(string & dest, Random & r, GenerateType gtype, int64_t choice, int64_t len,
+    int64_t range) {
+  switch (gtype) {
+  case GenWord:
+    r.nextWord(dest, choice);
+    break;
+  case GenNumber:
+    uint64_t v;
+    if (choice > 0) {
+      v = r.next_int32(choice);
+    } else {
+      v = r.next_uint64();
+    }
+    if (len > 0) {
+      dest = StringUtil::ToString(v, '0', len);
+    } else {
+      dest = StringUtil::ToString(v);
+    }
+    break;
+  case GenBytes:
+    if (range < 2) {
+      if (len > 0) {
+        dest = r.nextBytes(len, "ABCDEFGHIJKLMNOPQRSTUVWXYZ");
+      } else {
+        dest = r.nextBytes(r.next_int32(32), "ABCDEFGHIJKLMNOPQRSTUVWXYZ");
+      }
+    } else {
+      if (len > 0) {
+        int64_t nlen = len - range / 2 + r.next_int32(range);
+        if (nlen > 0) {
+          dest = r.nextBytes(nlen, "ABCDEFGHIJKLMNOPQRSTUVWXYZ");
+        } else {
+          dest = "";
+        }
+      } else {
+        dest = r.nextBytes(r.next_int32(range), "ABCDEFGHIJKLMNOPQRSTUVWXYZ");
+      }
+    }
+    break;
+  default:
+    THROW_EXCEPTION(IOException, "GenerateType not support");
+  }
+  return dest;
+}
+
+/**
+ * Generate random string sequences
+ * @param dest dest array
+ * @param size output array size
+ * @param type string type (word|number|bytes|tera)
+ */
+vector<string> & Generate(vector<string> & dest, uint64_t size, const string & type) {
+  Random r;
+  if (TestConfig.get(GenerateSeed) != NULL) {
+    r.setSeed(TestConfig.getInt(GenerateSeed, 0));
+  }
+  GenerateType gtype = GetGenerateType(type);
+  int64_t choice = TestConfig.getInt(GenerateChoice, -1);
+  int64_t len = TestConfig.getInt(GenerateLen, -1);
+  int64_t range = TestConfig.getInt(GenerateRange, 1);
+  string temp;
+  for (uint64_t i = 0; i < size; i++) {
+    dest.push_back(GenerateOne(temp, r, gtype, choice, len, range));
+  }
+  return dest;
+}
+
+/**
+ * Generate random string pair sequences
+ * @param dest dest array
+ * @param size output array size
+ * @param type string type (word|number|bytes|tera)
+ */
+vector<pair<string, string> > & Generate(vector<pair<string, string> > & dest, uint64_t size,
+    const string & type) {
+  Random r;
+  if (TestConfig.get(GenerateSeed) != NULL) {
+    r.setSeed(TestConfig.getInt(GenerateSeed, 0));
+  }
+  GenerateType gtype = GetGenerateType(type);
+  int64_t choice = TestConfig.getInt(GenerateChoice, -1);
+  int64_t keylen = TestConfig.getInt(GenerateKeyLen, -1);
+  int64_t valuelen = TestConfig.getInt(GenerateValueLen, -1);
+  int64_t keyRange = TestConfig.getInt(GenerateKeyRange, 1);
+  int64_t valueRange = TestConfig.getInt(GenerateValueRange, 1);
+  string key, value;
+  for (uint64_t i = 0; i < size; i++) {
+    GenerateOne(key, r, gtype, choice, keylen, keyRange);
+    GenerateOne(value, r, gtype, choice, valuelen, valueRange);
+    dest.push_back(std::make_pair(key, value));
+  }
+  return dest;
+}
+
+/**
+ * Generate random string pair sequences
+ * @param dest dest array
+ * @param length output bytes count
+ * @param type string type (word|number|bytes|tera)
+ */
+vector<pair<string, string> > & GenerateLength(vector<pair<string, string> > & dest,
+    uint64_t length, const string & type) {
+  Random r;
+  if (TestConfig.get(GenerateSeed) != NULL) {
+    r.setSeed(TestConfig.getInt(GenerateSeed, 0));
+  }
+  GenerateType gtype = GetGenerateType(type);
+  int64_t choice = TestConfig.getInt(GenerateChoice, -1);
+  int64_t keylen = TestConfig.getInt(GenerateKeyLen, -1);
+  int64_t valuelen = TestConfig.getInt(GenerateValueLen, -1);
+  int64_t keyRange = TestConfig.getInt(GenerateKeyRange, 1);
+  int64_t valueRange = TestConfig.getInt(GenerateValueRange, 1);
+  string key, value;
+  dest.reserve((size_t)(length / (keylen + valuelen) * 1.2));
+  for (uint64_t i = 0; i < length;) {
+    GenerateOne(key, r, gtype, choice, keylen, keyRange);
+    GenerateOne(value, r, gtype, choice, valuelen, valueRange);
+    dest.push_back(std::make_pair(key, value));
+    i += (key.length() + value.length() + 2);
+  }
+  return dest;
+}
+
+/**
+ * Generate random KV text:
+ * Key0\tValue0\n
+ * Key1\tValue1\n
+ * ...
+ * @param dest dest string contain generated text
+ * @param size output array size
+ * @param type string type (word|number|bytes|tera)
+ */
+string & GenerateKVText(string & dest, uint64_t size, const string & type) {
+  Random r;
+  if (TestConfig.get(GenerateSeed) != NULL) {
+    r.setSeed(TestConfig.getInt(GenerateSeed, 0));
+  }
+  GenerateType gtype = GetGenerateType(type);
+  int64_t choice = TestConfig.getInt(GenerateChoice, -1);
+  int64_t keylen = TestConfig.getInt(GenerateKeyLen, -1);
+  int64_t valuelen = TestConfig.getInt(GenerateValueLen, -1);
+  int64_t keyRange = TestConfig.getInt(GenerateKeyRange, 1);
+  int64_t valueRange = TestConfig.getInt(GenerateValueRange, 1);
+  string key, value;
+  for (uint64_t i = 0; i < size; i++) {
+    GenerateOne(key, r, gtype, choice, keylen, keyRange);
+    GenerateOne(value, r, gtype, choice, valuelen, valueRange);
+    dest.append(key);
+    dest.append("\t");
+    dest.append(value);
+    dest.append("\n");
+  }
+  return dest;
+}
+
+/**
+ * Generate random KV text:
+ * Key0\tValue0\n
+ * Key1\tValue1\n
+ * ...
+ * @param dest dest string contain generated text
+ * @param length output string length
+ * @param type string type (word|number|bytes)
+ */
+string & GenerateKVTextLength(string & dest, uint64_t length, const string & type) {
+  Random r;
+  if (TestConfig.get(GenerateSeed) != NULL) {
+    r.setSeed(TestConfig.getInt(GenerateSeed, 0));
+  }
+  GenerateType gtype = GetGenerateType(type);
+  int64_t choice = TestConfig.getInt(GenerateChoice, -1);
+  int64_t keylen = TestConfig.getInt(GenerateKeyLen, -1);
+  int64_t valuelen = TestConfig.getInt(GenerateValueLen, -1);
+  int64_t keyRange = TestConfig.getInt(GenerateKeyRange, 1);
+  int64_t valueRange = TestConfig.getInt(GenerateValueRange, 1);
+  string key, value;
+  while (dest.length() < length) {
+    GenerateOne(key, r, gtype, choice, keylen, keyRange);
+    GenerateOne(value, r, gtype, choice, valuelen, valueRange);
+    dest.append(key);
+    dest.append("\t");
+    dest.append(value);
+    dest.append("\n");
+  }
+  return dest;
+}
+
+/**
+ * File <-> String utilities
+ */
+string & ReadFile(string & dest, const string & path) {
+  FILE * fin = fopen(path.c_str(), "rb");
+  if (NULL == fin) {
+    THROW_EXCEPTION(IOException, "file not found or can not open for read");
+  }
+  char buff[1024 * 16];
+  while (true) {
+    size_t rd = fread(buff, 1, 1024 * 16, fin);
+    if (rd <= 0) {
+      break;
+    }
+    dest.append(buff, rd);
+  }
+  fclose(fin);
+  return dest;
+}
+
+void WriteFile(const string & content, const string & path) {
+  FILE * fout = fopen(path.c_str(), "wb");
+  if (NULL == fout) {
+    THROW_EXCEPTION(IOException, "file can not open for write");
+  }
+  size_t wt = fwrite(content.c_str(), 1, content.length(), fout);
+  if (wt != content.length()) {
+    THROW_EXCEPTION(IOException, "write file error");
+  }
+  fclose(fout);
+}
+
+bool FileEqual(const string & lh, const string & rh) {
+  string lhs, rhs;
+  ReadFile(lhs, lh);
+  ReadFile(rhs, rh);
+  return lhs == rhs;
+}
+
+KVGenerator::KVGenerator(uint32_t keylen, uint32_t vallen, bool unique)
+    : keylen(keylen), vallen(vallen), unique(unique) {
+  factor = 2999999;
+  keyb = new char[keylen + 32];
+  valb = new char[vallen + 32];
+  snprintf(keyformat, 32, "%%0%ulx", keylen);
+}
+
+KVGenerator::~KVGenerator() {
+  delete[] keyb;
+  delete[] valb;
+}
+
+char * KVGenerator::key(uint32_t & kl) {
+  long v;
+  if (unique) {
+    while (true) {
+      v = lrand48();
+      if (old_keys.find(v) == old_keys.end()) {
+        old_keys.insert(v);
+        break;
+      }
+    }
+  } else {
+    v = lrand48();
+  }
+  snprintf(keyb, keylen + 32, keyformat, v);
+  kl = keylen;
+  return keyb;
+}
+
+char * KVGenerator::value(uint32_t & vl) {
+  uint32_t off = 0;
+  while (off < vallen) {
+    long v = lrand48();
+    v = (v / factor) * factor;
+    uint32_t wn = snprintf(valb + off, vallen + 32 - off, "%09lx\t", v);
+    off += wn;
+  }
+  vl = vallen;
+  return valb;
+}
+
+void KVGenerator::write(FILE * fout, int64_t totallen) {
+  while (totallen > 0) {
+    uint32_t kl, vl;
+    char * key = this->key(kl);
+    char * value = this->value(vl);
+    fwrite(key, kl, 1, fout);
+    fputc('\t', fout);
+    fwrite(value, vl, 1, fout);
+    fputc('\n', fout);
+    totallen -= (kl + vl + 2);
+  }
+  fflush(fout);
+}
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef TEST_COMMONS_H_
+#define TEST_COMMONS_H_
+
+#include "gtest/gtest.h"
+#include "commons.h"
+#include "util/Random.h"
+#include "util/StringUtil.h"
+#include "util/Timer.h"
+#include "Buffers.h"
+#include "BufferStream.h"
+
+using std::pair;
+using std::vector;
+using std::set;
+using std::map;
+using std::string;
+
+using namespace NativeTask;
+
+extern Config TestConfig;
+
+/**
+ * e.g. MakeStringArray(dest, "a", "b", "c", NULL) = {"a","b","c"}
+ */
+vector<string> & MakeStringArray(vector<string> & dest, ...);
+
+extern const char * GenerateSeed;
+extern const char * GenerateChoice;
+extern const char * GenerateLen;
+extern const char * GenerateKeyLen;
+extern const char * GenerateValueLen;
+extern const char * GenerateRange;
+extern const char * GenerateKeyRange;
+extern const char * GenerateValueRange;
+
+enum GenerateType {
+  GenWord,
+  GenNumber,
+  GenBytes,
+};
+
+GenerateType GetGenerateType(const string & type);
+
+string & GenerateOne(string & dest, Random & r, GenerateType gtype, int64_t choice, int64_t len,
+    int64_t range = 0);
+/**
+ * Generate random string sequences
+ * @param dest dest array
+ * @param size output array size
+ * @param type string type (word|number|bytes|tera)
+ */
+vector<string> & Generate(vector<string> & dest, uint64_t size, const string & type);
+
+/**
+ * Generate random string pair sequences
+ * @param dest dest array
+ * @param size output array size
+ * @param type string type (word|number|bytes|tera)
+ */
+vector<pair<string, string> > & Generate(vector<pair<string, string> > & dest, uint64_t size,
+    const string & type);
+
+/**
+ * Generate random string pair sequences
+ * @param dest dest array
+ * @param length output bytes count
+ * @param type string type (word|number|bytes|tera)
+ */
+vector<pair<string, string> > & GenerateLength(vector<pair<string, string> > & dest,
+    uint64_t length, const string & type);
+
+/**
+ * Generate random KV text:
+ * Key0\tValue0\n
+ * Key1\tValue1\n
+ * ...
+ * @param dest dest string contain generated text
+ * @param size output kv pair count
+ * @param type string type (word|number|bytes|tera)
+ */
+string & GenerateKVText(string & dest, uint64_t size, const string & type);
+
+/**
+ * Generate random KV text:
+ * Key0\tValue0\n
+ * Key1\tValue1\n
+ * ...
+ * @param dest dest string contain generated text
+ * @param length output string length
+ * @param type string type (word|number|bytes|tera)
+ */
+string & GenerateKVTextLength(string & dest, uint64_t length, const string & type);
+
+/**
+ * File <-> String utilities
+ */
+string & ReadFile(string & dest, const string & path);
+void WriteFile(const string & content, const string & path);
+
+/**
+ * File compare
+ */
+bool FileEqual(const string & lh, const string & rh);
+
+/**
+ * generate k/v pairs with normal compression ratio
+ *
+ */
+class KVGenerator {
+protected:
+  uint32_t keylen;
+  uint32_t vallen;
+  bool unique;
+  long factor;
+  char * keyb;
+  char * valb;
+  char keyformat[32];
+  set<int64_t> old_keys;
+
+public:
+  KVGenerator(uint32_t keylen, uint32_t vallen, bool unique = false);
+
+  ~KVGenerator();
+
+  char * key(uint32_t & kl);
+
+  char * value(uint32_t & vl);
+
+  void write(FILE * fout, int64_t totallen);
+};
+
+#endif /* TEST_COMMONS_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestChecksum.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestChecksum.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestChecksum.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestChecksum.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/Checksum.h"
+#include "test_commons.h"
+
+void TestChecksum(ChecksumType type, void * buff, uint32_t len) {
+  uint32_t chm = Checksum::init(type);
+  Checksum::update(type, chm, buff, len);
+}
+
+TEST(Perf, CRC) {
+  uint32_t len = TestConfig.getInt("checksum.perf.size", 1024 * 1024 * 50);
+  int testTime = TestConfig.getInt("checksum.perf.time", 2);
+  char * buff = new char[len];
+  memset(buff, 1, len);
+  Timer timer;
+  for (int i = 0; i < testTime; i++) {
+    TestChecksum(CHECKSUM_CRC32, buff, len);
+  }
+  LOG("%s", timer.getSpeedM("CRC", len * testTime).c_str());
+  timer.reset();
+  for (int i = 0; i < testTime; i++) {
+    TestChecksum(CHECKSUM_CRC32C, buff, len);
+  }
+  LOG("%s", timer.getSpeedM("CRC32C", len * testTime).c_str());
+  delete[] buff;
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestHash.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestHash.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestHash.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestHash.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/Hash.h"
+#include "test_commons.h"
+
+static uint64_t test_length(int64_t len, size_t size, size_t loopTime) {
+  vector<string> data;
+  TestConfig.setInt(GenerateLen, len);
+  Generate(data, size, "bytes");
+  Timer t;
+  uint64_t ret;
+  for (size_t m = 0; m < loopTime; m++) {
+    for (size_t i = 0; i < data.size(); i++) {
+      ret += Hash::BytesHash(data[i].c_str(), data[i].length());
+    }
+  }
+  LOG("%s", t.getInterval(StringUtil::Format("Bytes%3lld", len).c_str()).c_str());
+  t.reset();
+  for (size_t m = 0; m < loopTime; m++) {
+    for (size_t i = 0; i < data.size(); i++) {
+      ret += Hash::CityHash(data[i].c_str(), data[i].length());
+    }
+  }
+  LOG("%s", t.getInterval(StringUtil::Format(" City%3lld", len).c_str()).c_str());
+  return ret;
+}
+
+TEST(Perf, Hash) {
+  uint64_t ret = 0;
+  ret += test_length(1, 100, 4000);
+  ret += test_length(17, 100, 4000);
+  ret += test_length(64, 100, 4000);
+  ret += test_length(128, 100, 4000);
+  ret += test_length(513, 100, 4000);
+  fprintf(stderr, "%llu\n", (long long unsigned int)ret);
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestProcess.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestProcess.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestProcess.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestProcess.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/Process.h"
+#include "test_commons.h"
+
+TEST(Process, Run) {
+  string out;
+  string err;
+  string cmd = TestConfig.get("process.run.cmd", "ls");
+  int retcode = Process::Run(cmd, &out, &err);
+  LOG("cmd: %s", cmd.c_str());
+  LOG("stdout: %s", out.c_str());
+  LOG("stderr: %s", err.c_str());
+  LOG("retcode: %d", retcode);
+  EXPECT_EQ(0, retcode);
+}
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestStringUtil.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestStringUtil.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestStringUtil.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestStringUtil.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/StringUtil.h"
+#include "test_commons.h"
+
+TEST(StringUtil, Convertion) {
+  ASSERT_FLOAT_EQ(StringUtil::toFloat("1.333"), 1.333);
+  ASSERT_FLOAT_EQ(StringUtil::toFloat(StringUtil::ToString(1.333f)), 1.333);
+  ASSERT_EQ(StringUtil::ToString(76957382U), "76957382");
+  ASSERT_EQ(StringUtil::ToString((uint64_t )76957382234233432ULL), "76957382234233432");
+  ASSERT_EQ(StringUtil::ToString(111, ' ', 40), "                                     111");
+}
+
+TEST(StringUtil, Format) {
+  string t = StringUtil::Format("%d %d %d %.3lf %s", 1, 2, 3, 1.333, "aaaaaaaaaaa");
+  ASSERT_EQ(t, "1 2 3 1.333 aaaaaaaaaaa");
+  string longstring(999, 'a');
+  string d = StringUtil::Format("%s", longstring.c_str());
+  ASSERT_EQ(longstring, d);
+}
+
+TEST(StringUtil, Trim) {
+  ASSERT_EQ(StringUtil::Trim("  \taaaa  \t  "), "aaaa");
+  ASSERT_EQ(StringUtil::Trim("  \t  \t  "), "");
+  ASSERT_EQ(StringUtil::Trim(""), "");
+}
+
+TEST(StringUtil, ToLower) {
+  ASSERT_EQ(StringUtil::ToLower("111ABabABabAbaB222"), "111abababababab222");
+  ASSERT_EQ(StringUtil::ToLower(""), "");
+}
+
+TEST(StringUtil, JoinSplit) {
+  vector<string> temp1, temp2, temp3, temp4;
+  StringUtil::Split("1aaa bbb ccc", " ", temp1, false);
+  StringUtil::Split("  1aaa  bbb  ccc ", " ", temp2, true);
+  ASSERT_EQ(temp1, temp2);
+  string j = StringUtil::Join(temp1, ",");
+  ASSERT_EQ(j, "1aaa,bbb,ccc");
+  StringUtil::Split("  a b ", " ", temp3, false);
+  ASSERT_EQ(temp3, MakeStringArray(temp4, "", "", "a", "b", "", NULL));
+}
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestSyncUtils.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestSyncUtils.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestSyncUtils.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestSyncUtils.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/SyncUtils.h"
+#include "test_commons.h"
+
+class TestThread : public Thread {
+  virtual void run() {
+    for (uint32_t i = 0; i < 5; i++) {
+      usleep(100);
+      LOG("sleep %d", i * 100);
+    }
+  }
+};
+
+TEST(SyncUtil, Thread) {
+  TestThread a, b, c;
+  a.start();
+  b.start();
+  c.start();
+  a.join();
+  b.join();
+  c.join();
+}
+
+class TestBind {
+public:
+
+  int get() {
+    return 100;
+  }
+
+  void foo() {
+    for (uint32_t i = 0; i < 2; i++) {
+      usleep(100);
+      LOG("usleep %d", i * 100);
+    }
+  }
+  void bar(const char * msg) {
+    for (uint32_t i = 0; i < 2; i++) {
+      usleep(100);
+      LOG("usleep %d %s", i * 100, msg);
+    }
+  }
+
+};
+
+TEST(SyncUtil, ThreadBind) {
+  TestBind a = TestBind();
+  Runnable * bind1 = BindNew(a, &TestBind::get);
+  Thread t = Thread(bind1);
+  Runnable * bind2 = BindNew(a, &TestBind::bar, "testmsg");
+  Thread t2 = Thread(bind2);
+  t.start();
+  t2.start();
+  t.join();
+  t2.join();
+
+  delete bind1;
+  delete bind2;
+}
+
+//class TestParallelFor {
+//protected:
+//  SpinLock lock;
+//  uint64_t aggr;
+//public:
+//  TestParallelFor() : aggr(0) {
+//  }
+//  void add(uint64_t i) {
+//    lock.lock();
+//    aggr += i;
+//    lock.unlock();
+//  }
+//  void test(uint64_t n, size_t threadnum) {
+//    aggr = 0;
+//    ParallelFor(*this, &TestParallelFor::add, 0ULL, n, threadnum);
+//    ASSERT_EQ(n*(n-1)/2, aggr);
+//  }
+//};
+//
+//TEST(SyncUtil, ParallelFor) {
+//  TestParallelFor tpf;
+//  tpf.test(100000, 2);
+//  tpf.test(100000, 3);
+//  tpf.test(100000, 4);
+//}
+
+TEST(Perf, ThreadOverhead) {
+  int64_t threadnum = TestConfig.getInt("thread.num", 1000);
+  Thread * t = new Thread[threadnum];
+  Timer timer;
+  for (uint32_t i = 0; i < threadnum; i++) {
+    t[i].start();
+  }
+  for (uint32_t i = 0; i < threadnum; i++) {
+    t[i].join();
+  }
+  LOG("%lld thread %s", (long long int )threadnum, timer.getInterval("start&join").c_str());
+  delete[] t;
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestWritableUtils.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestWritableUtils.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestWritableUtils.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestWritableUtils.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/WritableUtils.h"
+#include "test_commons.h"
+
+void TestVLong(int64_t v) {
+  char buff[1024];
+  char buff2[1024];
+  uint32_t dsize = WritableUtils::GetVLongSize(v);
+  uint32_t wsize = (uint32_t)-1;
+  WritableUtils::WriteVLong(v, buff, wsize);
+  ASSERT_EQ(dsize, wsize);
+  memcpy(buff2, buff, wsize);
+  uint32_t rsize;
+  int64_t rv = WritableUtils::ReadVLong(buff2, rsize);
+  ASSERT_EQ(v, rv);
+  ASSERT_EQ(rsize, dsize);
+}
+
+
+TEST(WritableUtils, VLong) {
+  int num = TestConfig.getInt("test.size", 3000);
+  int seed = TestConfig.getInt("test.seed", -1);
+  Random r(seed);
+  for (int i = 0; i < num; i++) {
+    uint64_t v = r.nextLog2(((uint64_t)-1) / 2 - 3);
+    TestVLong(v);
+    TestVLong(-v);
+  }
+}
+
+
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/testData/testGlibCBugSpill.out
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/testData/testGlibCBugSpill.out?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/testData/testGlibCBugSpill.out (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/testData/testGlibCBugSpill.out Thu Jul 17 17:44:55 2014
@@ -0,0 +1,2 @@

[... 4 lines stripped ...]
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform Thu Jul 17 17:44:55 2014
@@ -0,0 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.hadoop.mapred.nativetask.HadoopPlatform

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import junit.framework.TestCase;
+
+public class TestTaskContext extends TestCase {
+  
+  public void testTaskContext() {
+    TaskContext context = new TaskContext(null, null, null, null, null, null, null);
+    
+    context.setInputKeyClass(IntWritable.class);
+    assertEquals(IntWritable.class.getName(), context.getInputKeyClass().getName()); 
+ 
+    context.setInputValueClass(Text.class);
+    assertEquals(Text.class.getName(), context.getInputValueClass().getName()); 
+   
+    context.setOutputKeyClass(LongWritable.class);
+    assertEquals(LongWritable.class.getName(), context.getOuputKeyClass().getName()); 
+
+    context.setOutputValueClass(FloatWritable.class);
+    assertEquals(FloatWritable.class.getName(), context.getOutputValueClass().getName()); 
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.nativetask.DataReceiver;
+import org.apache.hadoop.mapred.nativetask.NativeDataSource;
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+import org.apache.hadoop.mapred.nativetask.handlers.BufferPullee;
+import org.apache.hadoop.mapred.nativetask.handlers.BufferPuller;
+import org.apache.hadoop.mapred.nativetask.handlers.BufferPushee;
+import org.apache.hadoop.mapred.nativetask.handlers.BufferPusher;
+import org.apache.hadoop.mapred.nativetask.handlers.IDataLoader;
+import org.apache.hadoop.mapred.nativetask.testutil.TestInput;
+import org.apache.hadoop.mapred.nativetask.testutil.TestInput.KV;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.apache.hadoop.util.Progress;
+import org.junit.Before;
+
+@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+public class TestBufferPushPull extends TestCase {
+
+  public static int BUFFER_LENGTH = 100; // 100 bytes
+  public static int INPUT_KV_COUNT = 1000;
+  private KV<BytesWritable, BytesWritable>[] dataInput;
+
+  @Override
+  @Before
+  public void setUp() {
+    this.dataInput = TestInput.getMapInputs(INPUT_KV_COUNT);
+  }
+
+  public void testPush() throws Exception {
+    final byte[] buff = new byte[BUFFER_LENGTH];
+
+    final InputBuffer input = new InputBuffer(buff);
+
+    final OutputBuffer out = new OutputBuffer(buff);
+
+    final Class<BytesWritable> iKClass = BytesWritable.class;
+    final Class<BytesWritable> iVClass = BytesWritable.class;
+
+    final RecordWriterForPush writer = new RecordWriterForPush() {
+      @Override
+      public void write(BytesWritable key, BytesWritable value) throws IOException {
+        final KV expect = dataInput[count++];
+        Assert.assertEquals(expect.key.toString(), key.toString());
+        Assert.assertEquals(expect.value.toString(), value.toString());
+      }
+    };
+
+    final BufferPushee pushee = new BufferPushee(iKClass, iVClass, writer);
+
+    final PushTarget handler = new PushTarget(out) {
+
+      @Override
+      public void sendData() throws IOException {
+        final int outputLength = out.length();
+        input.rewind(0, outputLength);
+        out.rewind();
+        pushee.collect(input);
+      }
+    };
+
+    final BufferPusher pusher = new BufferPusher(iKClass, iVClass, handler);
+
+    writer.reset();
+    for (int i = 0; i < INPUT_KV_COUNT; i++) {
+      pusher.collect(dataInput[i].key, dataInput[i].value);
+    }
+    pusher.close();
+    pushee.close();
+  }
+
+  public void testPull() throws Exception {
+    final byte[] buff = new byte[BUFFER_LENGTH];
+
+    final InputBuffer input = new InputBuffer(buff);
+
+    final OutputBuffer out = new OutputBuffer(buff);
+
+    final Class<BytesWritable> iKClass = BytesWritable.class;
+    final Class<BytesWritable> iVClass = BytesWritable.class;
+
+    final NativeHandlerForPull handler = new NativeHandlerForPull(input, out);
+
+    final KeyValueIterator iter = new KeyValueIterator();
+    final BufferPullee pullee = new BufferPullee(iKClass, iVClass, iter, handler);
+    handler.setDataLoader(pullee);
+
+    final BufferPuller puller = new BufferPuller(handler);
+    handler.setDataReceiver(puller);
+
+    int count = 0;
+
+    while (puller.next()) {
+      final DataInputBuffer key = puller.getKey();
+      final DataInputBuffer value = puller.getValue();
+
+      final BytesWritable keyBytes = new BytesWritable();
+      final BytesWritable valueBytes = new BytesWritable();
+
+      keyBytes.readFields(key);
+      valueBytes.readFields(value);
+
+      Assert.assertEquals(dataInput[count].key.toString(), keyBytes.toString());
+      Assert.assertEquals(dataInput[count].value.toString(), valueBytes.toString());
+
+      count++;
+    }
+
+    puller.close();
+    pullee.close();
+  }
+
+  public abstract class PushTarget implements NativeDataTarget {
+    OutputBuffer out;
+
+    PushTarget(OutputBuffer out) {
+      this.out = out;
+    }
+
+    @Override
+    public abstract void sendData() throws IOException;
+
+    @Override
+    public void finishSendData() throws IOException {
+      sendData();
+    }
+
+    @Override
+    public OutputBuffer getOutputBuffer() {
+      return out;
+    }
+  }
+
+  public abstract class RecordWriterForPush implements RecordWriter<BytesWritable, BytesWritable> {
+
+    protected int count = 0;
+
+    RecordWriterForPush() {
+    }
+
+    @Override
+    public abstract void write(BytesWritable key, BytesWritable value) throws IOException;
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+    }
+
+    public void reset() {
+      count = 0;
+    }
+  };
+
+  public static class NativeHandlerForPull implements NativeDataSource, NativeDataTarget {
+
+    InputBuffer in;
+    private final OutputBuffer out;
+
+    private IDataLoader dataLoader;
+    private DataReceiver dataReceiver;
+
+    public NativeHandlerForPull(InputBuffer input, OutputBuffer out) {
+      this.in = input;
+      this.out = out;
+    }
+
+    @Override
+    public InputBuffer getInputBuffer() {
+      return in;
+    }
+
+    @Override
+    public void setDataReceiver(DataReceiver handler) {
+      this.dataReceiver = handler;
+    }
+
+    @Override
+    public void loadData() throws IOException {
+      final int size = dataLoader.load();
+    }
+
+    public void setDataLoader(IDataLoader dataLoader) {
+      this.dataLoader = dataLoader;
+    }
+
+    @Override
+    public void sendData() throws IOException {
+      final int len = out.length();
+      out.rewind();
+      in.rewind(0, len);
+      dataReceiver.receiveData();
+    }
+
+    @Override
+    public void finishSendData() throws IOException {
+      dataReceiver.receiveData();
+    }
+
+    @Override
+    public OutputBuffer getOutputBuffer() {
+      return this.out;
+    }
+  }
+
+  public class KeyValueIterator implements RawKeyValueIterator {
+    int count = 0;
+    BytesWritable key;
+    BytesWritable value;
+
+    @Override
+    public DataInputBuffer getKey() throws IOException {
+      return convert(key);
+    }
+
+    @Override
+    public DataInputBuffer getValue() throws IOException {
+      return convert(value);
+    }
+
+    private DataInputBuffer convert(BytesWritable b) throws IOException {
+      final ByteArrayOutputStream out = new ByteArrayOutputStream();
+      b.write(new DataOutputStream(out));
+      final byte[] array = out.toByteArray();
+      final DataInputBuffer result = new DataInputBuffer();
+      result.reset(array, array.length);
+      return result;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      if (count < INPUT_KV_COUNT) {
+        key = dataInput[count].key;
+        value = dataInput[count].key;
+        count++;
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public Progress getProgress() {
+      return null;
+    }
+  };
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+public class TestByteBufferReadWrite extends TestCase{
+  
+  
+  public void testReadWrite() throws IOException {
+    byte[] buff = new byte[10000];
+    
+    InputBuffer input = new InputBuffer(buff);
+    MockDataTarget target = new MockDataTarget(buff);
+    ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
+    
+    writer.write(1);
+    writer.write(new byte[] {2, 2}, 0, 2);
+    writer.writeBoolean(true);
+    writer.writeByte(4);
+    writer.writeShort(5);
+    writer.writeChar(6);
+    writer.writeInt(7);
+    writer.writeLong(8);
+    writer.writeFloat(9);
+    writer.writeDouble(10);
+    writer.writeBytes("goodboy");
+    writer.writeChars("hello");
+    writer.writeUTF("native task");
+    
+    int length = target.getOutputBuffer().length();
+    input.rewind(0, length);
+    ByteBufferDataReader reader = new ByteBufferDataReader(input);
+    
+    Assert.assertEquals(1, reader.read());
+    byte[] two = new byte[2];
+    reader.read(two);
+    Assert.assertTrue(two[0] == two[1] && two[0] == 2);
+    
+    
+    Assert.assertEquals(true, reader.readBoolean());
+    Assert.assertEquals(4, reader.readByte());
+    Assert.assertEquals(5, reader.readShort());
+    Assert.assertEquals(6, reader.readChar());
+    Assert.assertEquals(7, reader.readInt());
+    Assert.assertEquals(8, reader.readLong());
+    Assert.assertTrue(reader.readFloat() - 9 < 0.0001);
+    Assert.assertTrue(reader.readDouble() - 10 < 0.0001);
+    
+    byte[] goodboy = new byte["goodboy".length()];
+    reader.read(goodboy);
+    Assert.assertEquals("goodboy", toString(goodboy));
+    
+    char[] hello = new char["hello".length()];
+    for (int i = 0; i < hello.length; i++) {
+      hello[i] = reader.readChar();
+    }
+    
+    String helloString = new String(hello);
+    Assert.assertEquals("hello", helloString);
+    
+    Assert.assertEquals("native task", reader.readUTF());
+    
+    Assert.assertEquals(0, input.remaining());
+  }
+  
+  public void testShortOfSpace() throws IOException {
+    byte[] buff = new byte[10];
+    MockDataTarget target = new MockDataTarget(buff);
+    ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
+    Assert.assertEquals(false, writer.hasUnFlushedData()); 
+    
+    writer.write(1);
+    writer.write(new byte[] {2, 2}, 0, 2);
+    Assert.assertEquals(true, writer.hasUnFlushedData()); 
+    
+    Assert.assertEquals(true, writer.shortOfSpace(100));
+  }
+
+  public void testFlush() throws IOException {
+    byte[] buff = new byte[10];
+    final Counter flushCount = new Counter();
+    final Flag finishFlag = new Flag();
+    MockDataTarget target = new MockDataTarget(buff) {
+      @Override
+      public void sendData() throws IOException {
+        flushCount.increase();
+      }
+      
+      @Override
+      public void finishSendData() throws IOException {
+        finishFlag.set(true);
+      }
+    };
+    
+    ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
+    Assert.assertEquals(false, writer.hasUnFlushedData()); 
+    
+    writer.write(1);
+    writer.write(new byte[100]);
+
+    Assert.assertEquals(true, writer.hasUnFlushedData()); 
+    writer.close();    
+    Assert.assertEquals(11, flushCount.get());
+    Assert.assertEquals(true, finishFlag.get()); 
+
+  }
+  
+  private static String toString(byte[] str) throws UnsupportedEncodingException {
+    return new String(str, 0, str.length, "UTF-8");
+  }
+  
+  private static class MockDataTarget implements NativeDataTarget {
+
+    private OutputBuffer out;
+
+    MockDataTarget(byte[] buffer) {
+      this.out = new OutputBuffer(buffer);
+    }
+    
+    @Override
+    public void sendData() throws IOException {
+      
+    }
+
+    @Override
+    public void finishSendData() throws IOException {
+       
+    }
+
+    @Override
+    public OutputBuffer getOutputBuffer() {
+      return out;
+    }    
+  }
+  
+  private static class Counter {
+    private int count;
+    
+    public int get() {
+      return count;
+    }
+    
+    public void increase() {
+      count++;
+    }
+  }
+  
+  private static class Flag {
+    private boolean value;
+    
+    public void set(boolean status) {
+      this.value = status;
+    }
+    
+    public boolean get() {
+      return this.value;
+    }
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.junit.Test;
+
+public class TestDirectBufferPool {
+
+  @Test
+  public void testGetInstance() throws Exception {
+    final int num = 100;
+    List<DirectBufferPool> pools = new ArrayList<DirectBufferPool>();
+    Thread[] list = new Thread[num];
+    for (int i = 0; i < num; i++)  {
+      Thread t = getPoolThread(pools);
+      t.start();
+      list[i] = t;
+    }
+    for (int i = 0; i < num; i++) {
+      try {
+        list[i].join(10000);
+      } catch (Exception e) {
+        e.printStackTrace(); 
+      }
+    }
+    DirectBufferPool p1 = pools.get(0);
+    assertNotNull(p1);
+    for (int i = 1; i < pools.size(); i++) {
+      DirectBufferPool p2 = pools.get(i);
+      assertNotNull(p2);
+      assertSame(p1, p2);
+    }
+  }
+
+  private Thread getPoolThread(final List<DirectBufferPool> pools) {
+    Thread t = new Thread() {
+      public void run() {
+        pools.add(DirectBufferPool.getInstance());
+      }
+    };
+    return t;
+  }
+
+
+  @Test
+  public void testBufBorrow() throws IOException {
+    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+    ByteBuffer b1 = bufferPool.borrowBuffer(100);
+    assertTrue(b1.isDirect());
+    assertEquals(0, b1.position());
+    assertEquals(100, b1.capacity());
+    bufferPool.returnBuffer(b1);
+    ByteBuffer b2 = bufferPool.borrowBuffer(100);
+    assertTrue(b2.isDirect());
+    assertEquals(0, b2.position());
+    assertEquals(100, b2.capacity());
+    assertSame(b1, b2);
+
+    ByteBuffer b3 =  bufferPool.borrowBuffer(100);
+    assertTrue(b3.isDirect());
+    assertEquals(0, b3.position());
+    assertEquals(100, b3.capacity());
+    assertNotSame(b2, b3);
+    bufferPool.returnBuffer(b2);
+    bufferPool.returnBuffer(b3);
+  }
+
+  @Test
+  public void testBufReset() throws IOException {
+    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+    ByteBuffer b1 = bufferPool.borrowBuffer(100);
+    assertTrue(b1.isDirect());
+    assertEquals(0, b1.position());
+    assertEquals(100, b1.capacity());
+    b1.putInt(1);
+    assertEquals(4, b1.position());
+    bufferPool.returnBuffer(b1);
+    ByteBuffer b2 = bufferPool.borrowBuffer(100);
+    assertSame(b1, b2);
+    assertTrue(b2.isDirect());
+    assertEquals(0, b2.position());
+    assertEquals(100, b2.capacity());
+  }
+
+  @Test
+  public void testBufReturn() throws IOException {
+    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+    int numOfBufs = 100;
+    int capacity = 100;
+    final ByteBuffer[] bufs = new ByteBuffer[numOfBufs];
+    for (int i = 0; i < numOfBufs; i++) {
+      bufs[i] = bufferPool.borrowBuffer(capacity);
+    }
+
+    assertEquals(0, bufferPool.getBufCountsForCapacity(capacity));
+
+
+    int numOfThreads = numOfBufs;
+    Thread[] list = new Thread[numOfThreads];
+    for (int i = 0; i < numOfThreads; i++) {
+      Thread t = retBufThread(bufferPool, bufs, i);
+      t.start();
+      list[i] = t;
+    }
+    for (int i = 0; i < numOfThreads; i++) {
+      try {
+        list[i].join(10000);
+      } catch (Exception e) {
+       e.printStackTrace();
+      }
+    }
+
+    assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity));
+  }
+
+  private Thread retBufThread(final DirectBufferPool bufferPool, final ByteBuffer[] bufs, final int i) {
+       Thread t = new Thread(new Runnable(){
+        @Override
+        public void run() {
+          try {
+          bufferPool.returnBuffer(bufs[i]);
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+      });
+    return t;
+  }
+
+  @Test
+  public void testBufException() {
+    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+    boolean thrown = false;
+    try {
+      bufferPool.returnBuffer(null);
+    } catch (IOException e) {
+      thrown = true;
+    }
+    assertEquals(true, thrown);
+
+    thrown = false;
+    ByteBuffer buf = ByteBuffer.allocate(100);
+    try {
+      bufferPool.returnBuffer(buf);
+    } catch (IOException e) {
+      thrown = true;
+    }
+    assertEquals(true, thrown);
+  }
+
+  @Test
+  public void testBufWeakRefClear() throws IOException {
+    final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+    int numOfBufs = 100;
+    int capacity = 100;
+    ByteBuffer[] list = new ByteBuffer[capacity];
+    for (int i = 0; i < numOfBufs; i++) {
+      list[i] = bufferPool.borrowBuffer(capacity);
+    }
+    for (int i = 0; i < numOfBufs; i++) {
+      bufferPool.returnBuffer(list[i]);
+      list[i] = null;
+    }
+
+    assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity));
+
+    for (int i = 0; i < 3; i++) {
+      System.gc();
+    }
+
+    ByteBuffer b = bufferPool.borrowBuffer(capacity);
+    assertEquals(0, bufferPool.getBufCountsForCapacity(capacity));
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.junit.Assert;
+
+public class TestInputBuffer extends TestCase {
+  public void testInputBuffer() throws IOException {
+    final int size = 100;
+    final InputBuffer input1 = new InputBuffer(BufferType.DIRECT_BUFFER, size);
+    Assert.assertEquals(input1.getType(), BufferType.DIRECT_BUFFER);
+
+    Assert.assertTrue(input1.position() == 0);
+    Assert.assertTrue(input1.length() == 0);
+    Assert.assertTrue(input1.remaining() == 0);
+    Assert.assertTrue(input1.capacity() == size);
+
+    final InputBuffer input2 = new InputBuffer(BufferType.HEAP_BUFFER, size);
+    Assert.assertEquals(input2.getType(), BufferType.HEAP_BUFFER);
+
+    Assert.assertTrue(input2.position() == 0);
+    Assert.assertTrue(input2.length() == 0);
+    Assert.assertTrue(input2.remaining() == 0);
+    Assert.assertTrue(input2.capacity() == size);
+
+    final InputBuffer input3 = new InputBuffer(new byte[size]);
+    Assert.assertEquals(input3.getType(), BufferType.HEAP_BUFFER);
+
+    Assert.assertTrue(input3.position() == 0);
+    Assert.assertTrue(input3.length() == 0);
+    Assert.assertTrue(input3.remaining() == 0);
+    Assert.assertEquals(input3.capacity(), size);
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import junit.framework.TestCase;
+
+import org.junit.Assert;
+
+public class TestOutputBuffer extends TestCase {
+  public void testOutputBuffer() {
+    final int size = 100;
+    final OutputBuffer output1 = new OutputBuffer(BufferType.DIRECT_BUFFER, size);
+    Assert.assertEquals(output1.getType(), BufferType.DIRECT_BUFFER);
+
+    Assert.assertTrue(output1.length() == 0);
+    Assert.assertEquals(output1.limit(), size);
+
+    final OutputBuffer output2 = new OutputBuffer(BufferType.HEAP_BUFFER, size);
+    Assert.assertEquals(output2.getType(), BufferType.HEAP_BUFFER);
+
+    Assert.assertTrue(output2.length() == 0);
+    Assert.assertEquals(output2.limit(), size);
+
+    final OutputBuffer output3 = new OutputBuffer(new byte[size]);
+    Assert.assertEquals(output3.getType(), BufferType.HEAP_BUFFER);
+
+    Assert.assertTrue(output3.length() == 0);
+    Assert.assertEquals(output3.limit(), size);
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.mapred.Task.CombinerRunner;
+import org.apache.hadoop.mapred.nativetask.Command;
+import org.apache.hadoop.mapred.nativetask.INativeHandler;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+public class TestCombineHandler extends TestCase {
+
+  private CombinerHandler handler;
+  private INativeHandler nativeHandler;
+  private BufferPusher pusher;
+  private BufferPuller puller;
+  private CombinerRunner combinerRunner;
+
+  @Override
+  public void setUp() throws IOException {
+    
+    this.nativeHandler = Mockito.mock(INativeHandler.class);
+    this.pusher = Mockito.mock(BufferPusher.class);
+    this.puller =  Mockito.mock(BufferPuller.class);
+    this.combinerRunner =  Mockito.mock(CombinerRunner.class);
+
+    Mockito.when(nativeHandler.getInputBuffer()).thenReturn(new InputBuffer(BufferType.HEAP_BUFFER, 100));
+  }
+
+  public void testCommandDispatcherSetting() throws IOException {
+    this.handler = new CombinerHandler(nativeHandler, combinerRunner, puller, pusher);
+    Mockito.verify(nativeHandler, Mockito.times(1)).setCommandDispatcher(Matchers.eq(handler));
+    Mockito.verify(nativeHandler, Mockito.times(1)).setDataReceiver(Matchers.eq(puller));
+  }
+
+  public void testCombine() throws IOException, InterruptedException, ClassNotFoundException {
+    this.handler = new CombinerHandler(nativeHandler, combinerRunner, puller, pusher);
+    Assert.assertEquals(null, handler.onCall(CombinerHandler.COMBINE, null));
+    handler.close();
+    handler.close();
+
+    Mockito.verify(combinerRunner, Mockito.times(1)).combine(Matchers.eq(puller), Matchers.eq(pusher));
+
+    Mockito.verify(pusher, Mockito.times(1)).close();
+    Mockito.verify(puller, Mockito.times(1)).close();
+    Mockito.verify(nativeHandler, Mockito.times(1)).close();
+  }
+
+  public void testOnCall() throws IOException {
+    this.handler = new CombinerHandler(nativeHandler, combinerRunner, puller, pusher);
+    Assert.assertEquals(null, handler.onCall(new Command(-1), null));
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.nativetask.Command;
+import org.apache.hadoop.mapred.nativetask.ICombineHandler;
+import org.apache.hadoop.mapred.nativetask.INativeHandler;
+import org.apache.hadoop.mapred.nativetask.TaskContext;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.util.OutputUtil;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+public class TestNativeCollectorOnlyHandler extends TestCase {
+
+  private NativeCollectorOnlyHandler handler;
+  private INativeHandler nativeHandler;
+  private BufferPusher pusher;
+  private ICombineHandler combiner;
+  private TaskContext taskContext;
+  private String localDir = "build/test/mapred/local";
+
+  @Override
+  public void setUp() throws IOException {
+    this.nativeHandler = Mockito.mock(INativeHandler.class);
+    this.pusher = Mockito.mock(BufferPusher.class);
+    this.combiner = Mockito.mock(ICombineHandler.class);
+    JobConf jobConf = new JobConf();
+    jobConf.set(OutputUtil.NATIVE_TASK_OUTPUT_MANAGER,
+        "org.apache.hadoop.mapred.nativetask.util.LocalJobOutputFiles");
+    jobConf.set("mapred.local.dir", localDir);
+    this.taskContext = new TaskContext(jobConf,
+        BytesWritable.class, BytesWritable.class,
+        BytesWritable.class,
+        BytesWritable.class,
+        null,
+        null);
+
+    Mockito.when(nativeHandler.getInputBuffer()).thenReturn(new InputBuffer(BufferType.HEAP_BUFFER, 100));
+  }
+
+  public void testCollect() throws IOException {
+    this.handler = new NativeCollectorOnlyHandler(taskContext, nativeHandler, pusher, combiner);
+    handler.collect(new BytesWritable(), new BytesWritable(), 100);
+    handler.close();
+    handler.close();
+
+    Mockito.verify(pusher, Mockito.times(1)).collect(Matchers.any(BytesWritable.class),
+        Matchers.any(BytesWritable.class), Matchers.anyInt());
+
+    Mockito.verify(pusher, Mockito.times(1)).close();
+    Mockito.verify(combiner, Mockito.times(1)).close();
+    Mockito.verify(nativeHandler, Mockito.times(1)).close();
+  }
+
+  public void testGetCombiner() throws IOException {
+    this.handler = new NativeCollectorOnlyHandler(taskContext, nativeHandler, pusher, combiner);
+    Mockito.when(combiner.getId()).thenReturn(100L);
+    final ReadWriteBuffer result = handler.onCall(NativeCollectorOnlyHandler.GET_COMBINE_HANDLER, null);
+    Assert.assertEquals(100L, result.readLong());
+  }
+
+  public void testOnCall() throws IOException {
+    this.handler = new NativeCollectorOnlyHandler(taskContext, nativeHandler, pusher, combiner);
+    boolean thrown = false;
+    try {
+      handler.onCall(new Command(-1), null);
+    } catch(final IOException e) {
+      thrown = true;
+    }
+    Assert.assertTrue("exception thrown", thrown);
+
+    final String expectedOutputPath = localDir + "/output/file.out";
+    final String expectedOutputIndexPath = localDir + "/output/file.out.index";
+    final String expectedSpillPath = localDir + "/output/spill0.out";
+
+    final String outputPath = handler.onCall(NativeCollectorOnlyHandler.GET_OUTPUT_PATH, null).readString();
+    Assert.assertEquals(expectedOutputPath, outputPath);
+
+    final String outputIndexPath = handler.onCall(NativeCollectorOnlyHandler.GET_OUTPUT_INDEX_PATH, null).readString();
+    Assert.assertEquals(expectedOutputIndexPath, outputIndexPath);
+
+    final String spillPath = handler.onCall(NativeCollectorOnlyHandler.GET_SPILL_PATH, null).readString();
+    Assert.assertEquals(expectedSpillPath, spillPath);
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.buffer.DataInputStream;
+import org.apache.hadoop.mapred.nativetask.buffer.DataOutputStream;
+import org.apache.hadoop.mapred.nativetask.testutil.TestInput;
+import org.apache.hadoop.mapred.nativetask.testutil.TestInput.KV;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class TestKVSerializer extends TestCase {
+
+  int inputArraySize = 1000; // 1000 bytesWriable elements
+  int bufferSize = 100; // bytes
+  private KV<BytesWritable, BytesWritable>[] inputArray;
+
+  final ByteArrayOutputStream result = new ByteArrayOutputStream();
+  private SizedWritable key;
+  private SizedWritable value;
+  private KVSerializer serializer;
+
+  @Override
+  @Before
+  public void setUp() throws IOException {
+    this.inputArray = TestInput.getMapInputs(inputArraySize);
+    this.key = new SizedWritable(BytesWritable.class);
+    this.value = new SizedWritable(BytesWritable.class);
+
+    this.serializer = new KVSerializer(BytesWritable.class, BytesWritable.class);
+
+    key.reset(inputArray[4].key);
+    value.reset(inputArray[4].value);
+    serializer.updateLength(key, value);
+  }
+
+  public void testUpdateLength() throws IOException {
+    Mockito.mock(DataOutputStream.class);
+
+    int kvLength = 0;
+    for (int i = 0; i < inputArraySize; i++) {
+      key.reset(inputArray[i].key);
+      value.reset(inputArray[i].value);
+      serializer.updateLength(key, value);
+
+      // verify whether the size increase
+      Assert.assertTrue(key.length + value.length > kvLength);
+      kvLength = key.length + value.length;
+    }
+  }
+
+  public void testSerializeKV() throws IOException {
+    final DataOutputStream dataOut = Mockito.mock(DataOutputStream.class);
+
+    Mockito.when(dataOut.hasUnFlushedData()).thenReturn(true);
+    Mockito.when(dataOut.shortOfSpace(key.length + value.length + Constants.SIZEOF_KV_LENGTH)).thenReturn(true);
+    final int written = serializer.serializeKV(dataOut, key, value);
+
+    // flush once, write 4 int, and 2 byte array
+    Mockito.verify(dataOut, Mockito.times(1)).flush();
+    Mockito.verify(dataOut, Mockito.times(4)).writeInt(Matchers.anyInt());
+    Mockito.verify(dataOut, Mockito.times(2)).write(Matchers.any(byte[].class), Matchers.anyInt(), Matchers.anyInt());
+
+    Assert.assertEquals(written, key.length + value.length + Constants.SIZEOF_KV_LENGTH);
+  }
+
+  public void testSerializeNoFlush() throws IOException {
+    final DataOutputStream dataOut = Mockito.mock(DataOutputStream.class);
+
+    // suppose there are enough space
+    Mockito.when(dataOut.hasUnFlushedData()).thenReturn(true);
+    Mockito.when(dataOut.shortOfSpace(Matchers.anyInt())).thenReturn(false);
+    final int written = serializer.serializeKV(dataOut, key, value);
+
+    // flush 0, write 4 int, and 2 byte array
+    Mockito.verify(dataOut, Mockito.times(0)).flush();
+    Mockito.verify(dataOut, Mockito.times(4)).writeInt(Matchers.anyInt());
+    Mockito.verify(dataOut, Mockito.times(2)).write(Matchers.any(byte[].class), Matchers.anyInt(), Matchers.anyInt());
+
+    Assert.assertEquals(written, key.length + value.length + Constants.SIZEOF_KV_LENGTH);
+  }
+
+  public void testSerializePartitionKV() throws IOException {
+    final DataOutputStream dataOut = Mockito.mock(DataOutputStream.class);
+
+    Mockito.when(dataOut.hasUnFlushedData()).thenReturn(true);
+    Mockito.when(
+        dataOut
+        .shortOfSpace(key.length + value.length + Constants.SIZEOF_KV_LENGTH + Constants.SIZEOF_PARTITION_LENGTH))
+        .thenReturn(true);
+    final int written = serializer.serializePartitionKV(dataOut, 100, key, value);
+
+    // flush once, write 4 int, and 2 byte array
+    Mockito.verify(dataOut, Mockito.times(1)).flush();
+    Mockito.verify(dataOut, Mockito.times(5)).writeInt(Matchers.anyInt());
+    Mockito.verify(dataOut, Mockito.times(2)).write(Matchers.any(byte[].class), Matchers.anyInt(), Matchers.anyInt());
+
+    Assert.assertEquals(written, key.length + value.length + Constants.SIZEOF_KV_LENGTH
+        + Constants.SIZEOF_PARTITION_LENGTH);
+  }
+
+  public void testDeserializerNoData() throws IOException {
+    final DataInputStream in = Mockito.mock(DataInputStream.class);
+    Mockito.when(in.hasUnReadData()).thenReturn(false);
+    Assert.assertEquals(0, serializer.deserializeKV(in, key, value));
+  }
+
+  public void testDeserializer() throws IOException {
+    final DataInputStream in = Mockito.mock(DataInputStream.class);
+    Mockito.when(in.hasUnReadData()).thenReturn(true);
+    Assert.assertTrue(serializer.deserializeKV(in, key, value) > 0);
+
+    Mockito.verify(in, Mockito.times(4)).readInt();
+    Mockito.verify(in, Mockito.times(2)).readFully(Matchers.any(byte[].class), Matchers.anyInt(), Matchers.anyInt());
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+@SuppressWarnings({ "rawtypes", "deprecation" })
+public class TestNativeSerialization extends TestCase {
+  public void testRegisterAndGet() throws IOException {
+    final NativeSerialization serialization = NativeSerialization.getInstance();
+    serialization.reset();
+
+    serialization.register(WritableKey.class.getName(), ComparableKeySerializer.class);
+
+    INativeSerializer serializer = serialization.getSerializer(WritableKey.class);
+    Assert.assertEquals(ComparableKeySerializer.class.getName(), serializer.getClass().getName());
+
+    serializer = serialization.getSerializer(WritableValue.class);
+    Assert.assertEquals(DefaultSerializer.class.getName(), serializer.getClass().getName());
+
+    boolean ioExceptionThrown = false;
+    try {
+      serializer = serialization.getSerializer(NonWritableValue.class);
+    } catch (final IOException e) {
+      ioExceptionThrown = true;
+    }
+    Assert.assertTrue(ioExceptionThrown);
+  }
+
+  public static class WritableKey implements Writable {
+    private int value;
+
+    public WritableKey(int a) {
+      this.value = a;
+    }
+
+    public int getLength() {
+      return 4;
+    }
+
+    public int getValue() {
+      return value;
+    }
+
+    public void setValue(int v) {
+      this.value = v;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+    }
+  }
+
+  public static class WritableValue implements Writable {
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+    }
+  }
+
+  public static class NonWritableValue {
+  }
+
+  public static class ComparableKeySerializer implements INativeComparable, INativeSerializer<WritableKey> {
+
+    @Override
+    public int getLength(WritableKey w) throws IOException {
+      return w.getLength();
+    }
+
+    @Override
+    public void serialize(WritableKey w, DataOutput out) throws IOException {
+      out.writeInt(w.getValue());
+    }
+
+    @Override
+    public void deserialize(DataInput in, int length, WritableKey w) throws IOException {
+      w.setValue(in.readInt());
+    }
+  }
+}