You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/17 19:45:01 UTC
svn commit: r1611413 [16/18] - in
/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client:
./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nati...
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdarg.h>
+#include "commons.h"
+#include "util/Random.h"
+#include "FileSystem.h"
+#include "test_commons.h"
+
+Config TestConfig = Config();
+
+const char * GenerateSeed = "generate.seed";
+const char * GenerateChoice = "generate.choice";
+const char * GenerateLen = "generate.len";
+const char * GenerateKeyLen = "generate.key.len";
+const char * GenerateValueLen = "generate.value.len";
+const char * GenerateRange = "generate.range";
+const char * GenerateKeyRange = "generate.key.range";
+const char * GenerateValueRange = "generate.value.range";
+
+vector<string> & MakeStringArray(vector<string> & dest, ...) {
+ va_list al;
+ va_start(al, dest);
+ while (true) {
+ const char * s = va_arg(al, const char *);
+ if (s == NULL) {
+ break;
+ }
+ dest.push_back(s);
+ }
+ va_end(al);
+ return dest;
+}
+
+GenerateType GetGenerateType(const string & type) {
+ if (type == "word") {
+ return GenWord;
+ } else if (type == "number") {
+ return GenNumber;
+ } else if (type == "bytes") {
+ return GenBytes;
+ } else {
+ THROW_EXCEPTION(UnsupportException, "GenerateType not support");
+ }
+}
+
+string & GenerateOne(string & dest, Random & r, GenerateType gtype, int64_t choice, int64_t len,
+ int64_t range) {
+ switch (gtype) {
+ case GenWord:
+ r.nextWord(dest, choice);
+ break;
+ case GenNumber:
+ uint64_t v;
+ if (choice > 0) {
+ v = r.next_int32(choice);
+ } else {
+ v = r.next_uint64();
+ }
+ if (len > 0) {
+ dest = StringUtil::ToString(v, '0', len);
+ } else {
+ dest = StringUtil::ToString(v);
+ }
+ break;
+ case GenBytes:
+ if (range < 2) {
+ if (len > 0) {
+ dest = r.nextBytes(len, "ABCDEFGHIJKLMNOPQRSTUVWXYZ");
+ } else {
+ dest = r.nextBytes(r.next_int32(32), "ABCDEFGHIJKLMNOPQRSTUVWXYZ");
+ }
+ } else {
+ if (len > 0) {
+ int64_t nlen = len - range / 2 + r.next_int32(range);
+ if (nlen > 0) {
+ dest = r.nextBytes(nlen, "ABCDEFGHIJKLMNOPQRSTUVWXYZ");
+ } else {
+ dest = "";
+ }
+ } else {
+ dest = r.nextBytes(r.next_int32(range), "ABCDEFGHIJKLMNOPQRSTUVWXYZ");
+ }
+ }
+ break;
+ default:
+ THROW_EXCEPTION(IOException, "GenerateType not support");
+ }
+ return dest;
+}
+
+/**
+ * Generate random string sequences
+ * @param dest dest array
+ * @param size output array size
+ * @param type string type (word|number|bytes|tera)
+ */
+vector<string> & Generate(vector<string> & dest, uint64_t size, const string & type) {
+ Random r;
+ if (TestConfig.get(GenerateSeed) != NULL) {
+ r.setSeed(TestConfig.getInt(GenerateSeed, 0));
+ }
+ GenerateType gtype = GetGenerateType(type);
+ int64_t choice = TestConfig.getInt(GenerateChoice, -1);
+ int64_t len = TestConfig.getInt(GenerateLen, -1);
+ int64_t range = TestConfig.getInt(GenerateRange, 1);
+ string temp;
+ for (uint64_t i = 0; i < size; i++) {
+ dest.push_back(GenerateOne(temp, r, gtype, choice, len, range));
+ }
+ return dest;
+}
+
+/**
+ * Generate random string pair sequences
+ * @param dest dest array
+ * @param size output array size
+ * @param type string type (word|number|bytes|tera)
+ */
+vector<pair<string, string> > & Generate(vector<pair<string, string> > & dest, uint64_t size,
+ const string & type) {
+ Random r;
+ if (TestConfig.get(GenerateSeed) != NULL) {
+ r.setSeed(TestConfig.getInt(GenerateSeed, 0));
+ }
+ GenerateType gtype = GetGenerateType(type);
+ int64_t choice = TestConfig.getInt(GenerateChoice, -1);
+ int64_t keylen = TestConfig.getInt(GenerateKeyLen, -1);
+ int64_t valuelen = TestConfig.getInt(GenerateValueLen, -1);
+ int64_t keyRange = TestConfig.getInt(GenerateKeyRange, 1);
+ int64_t valueRange = TestConfig.getInt(GenerateValueRange, 1);
+ string key, value;
+ for (uint64_t i = 0; i < size; i++) {
+ GenerateOne(key, r, gtype, choice, keylen, keyRange);
+ GenerateOne(value, r, gtype, choice, valuelen, valueRange);
+ dest.push_back(std::make_pair(key, value));
+ }
+ return dest;
+}
+
+/**
+ * Generate random string pair sequences
+ * @param dest dest array
+ * @param length output bytes count
+ * @param type string type (word|number|bytes|tera)
+ */
+vector<pair<string, string> > & GenerateLength(vector<pair<string, string> > & dest,
+ uint64_t length, const string & type) {
+ Random r;
+ if (TestConfig.get(GenerateSeed) != NULL) {
+ r.setSeed(TestConfig.getInt(GenerateSeed, 0));
+ }
+ GenerateType gtype = GetGenerateType(type);
+ int64_t choice = TestConfig.getInt(GenerateChoice, -1);
+ int64_t keylen = TestConfig.getInt(GenerateKeyLen, -1);
+ int64_t valuelen = TestConfig.getInt(GenerateValueLen, -1);
+ int64_t keyRange = TestConfig.getInt(GenerateKeyRange, 1);
+ int64_t valueRange = TestConfig.getInt(GenerateValueRange, 1);
+ string key, value;
+ dest.reserve((size_t)(length / (keylen + valuelen) * 1.2));
+ for (uint64_t i = 0; i < length;) {
+ GenerateOne(key, r, gtype, choice, keylen, keyRange);
+ GenerateOne(value, r, gtype, choice, valuelen, valueRange);
+ dest.push_back(std::make_pair(key, value));
+ i += (key.length() + value.length() + 2);
+ }
+ return dest;
+}
+
+/**
+ * Generate random KV text:
+ * Key0\tValue0\n
+ * Key1\tValue1\n
+ * ...
+ * @param dest dest string contain generated text
+ * @param size output array size
+ * @param type string type (word|number|bytes|tera)
+ */
+string & GenerateKVText(string & dest, uint64_t size, const string & type) {
+ Random r;
+ if (TestConfig.get(GenerateSeed) != NULL) {
+ r.setSeed(TestConfig.getInt(GenerateSeed, 0));
+ }
+ GenerateType gtype = GetGenerateType(type);
+ int64_t choice = TestConfig.getInt(GenerateChoice, -1);
+ int64_t keylen = TestConfig.getInt(GenerateKeyLen, -1);
+ int64_t valuelen = TestConfig.getInt(GenerateValueLen, -1);
+ int64_t keyRange = TestConfig.getInt(GenerateKeyRange, 1);
+ int64_t valueRange = TestConfig.getInt(GenerateValueRange, 1);
+ string key, value;
+ for (uint64_t i = 0; i < size; i++) {
+ GenerateOne(key, r, gtype, choice, keylen, keyRange);
+ GenerateOne(value, r, gtype, choice, valuelen, valueRange);
+ dest.append(key);
+ dest.append("\t");
+ dest.append(value);
+ dest.append("\n");
+ }
+ return dest;
+}
+
+/**
+ * Generate random KV text:
+ * Key0\tValue0\n
+ * Key1\tValue1\n
+ * ...
+ * @param dest dest string contain generated text
+ * @param length output string length
+ * @param type string type (word|number|bytes)
+ */
+string & GenerateKVTextLength(string & dest, uint64_t length, const string & type) {
+ Random r;
+ if (TestConfig.get(GenerateSeed) != NULL) {
+ r.setSeed(TestConfig.getInt(GenerateSeed, 0));
+ }
+ GenerateType gtype = GetGenerateType(type);
+ int64_t choice = TestConfig.getInt(GenerateChoice, -1);
+ int64_t keylen = TestConfig.getInt(GenerateKeyLen, -1);
+ int64_t valuelen = TestConfig.getInt(GenerateValueLen, -1);
+ int64_t keyRange = TestConfig.getInt(GenerateKeyRange, 1);
+ int64_t valueRange = TestConfig.getInt(GenerateValueRange, 1);
+ string key, value;
+ while (dest.length() < length) {
+ GenerateOne(key, r, gtype, choice, keylen, keyRange);
+ GenerateOne(value, r, gtype, choice, valuelen, valueRange);
+ dest.append(key);
+ dest.append("\t");
+ dest.append(value);
+ dest.append("\n");
+ }
+ return dest;
+}
+
+/**
+ * File <-> String utilities
+ */
+string & ReadFile(string & dest, const string & path) {
+ FILE * fin = fopen(path.c_str(), "rb");
+ if (NULL == fin) {
+ THROW_EXCEPTION(IOException, "file not found or can not open for read");
+ }
+ char buff[1024 * 16];
+ while (true) {
+ size_t rd = fread(buff, 1, 1024 * 16, fin);
+ if (rd <= 0) {
+ break;
+ }
+ dest.append(buff, rd);
+ }
+ fclose(fin);
+ return dest;
+}
+
+void WriteFile(const string & content, const string & path) {
+ FILE * fout = fopen(path.c_str(), "wb");
+ if (NULL == fout) {
+ THROW_EXCEPTION(IOException, "file can not open for write");
+ }
+ size_t wt = fwrite(content.c_str(), 1, content.length(), fout);
+ if (wt != content.length()) {
+ THROW_EXCEPTION(IOException, "write file error");
+ }
+ fclose(fout);
+}
+
+bool FileEqual(const string & lh, const string & rh) {
+ string lhs, rhs;
+ ReadFile(lhs, lh);
+ ReadFile(rhs, rh);
+ return lhs == rhs;
+}
+
+KVGenerator::KVGenerator(uint32_t keylen, uint32_t vallen, bool unique)
+ : keylen(keylen), vallen(vallen), unique(unique) {
+ factor = 2999999;
+ keyb = new char[keylen + 32];
+ valb = new char[vallen + 32];
+ snprintf(keyformat, 32, "%%0%ulx", keylen);
+}
+
+KVGenerator::~KVGenerator() {
+ delete[] keyb;
+ delete[] valb;
+}
+
+char * KVGenerator::key(uint32_t & kl) {
+ long v;
+ if (unique) {
+ while (true) {
+ v = lrand48();
+ if (old_keys.find(v) == old_keys.end()) {
+ old_keys.insert(v);
+ break;
+ }
+ }
+ } else {
+ v = lrand48();
+ }
+ snprintf(keyb, keylen + 32, keyformat, v);
+ kl = keylen;
+ return keyb;
+}
+
+char * KVGenerator::value(uint32_t & vl) {
+ uint32_t off = 0;
+ while (off < vallen) {
+ long v = lrand48();
+ v = (v / factor) * factor;
+ uint32_t wn = snprintf(valb + off, vallen + 32 - off, "%09lx\t", v);
+ off += wn;
+ }
+ vl = vallen;
+ return valb;
+}
+
+void KVGenerator::write(FILE * fout, int64_t totallen) {
+ while (totallen > 0) {
+ uint32_t kl, vl;
+ char * key = this->key(kl);
+ char * value = this->value(vl);
+ fwrite(key, kl, 1, fout);
+ fputc('\t', fout);
+ fwrite(value, vl, 1, fout);
+ fputc('\n', fout);
+ totallen -= (kl + vl + 2);
+ }
+ fflush(fout);
+}
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/test_commons.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef TEST_COMMONS_H_
+#define TEST_COMMONS_H_
+
+#include "gtest/gtest.h"
+#include "commons.h"
+#include "util/Random.h"
+#include "util/StringUtil.h"
+#include "util/Timer.h"
+#include "Buffers.h"
+#include "BufferStream.h"
+
+using std::pair;
+using std::vector;
+using std::set;
+using std::map;
+using std::string;
+
+using namespace NativeTask;
+
+extern Config TestConfig;
+
+/**
+ * e.g. MakeStringArray(dest, "a", "b", "c", NULL) = {"a","b","c"}
+ */
+vector<string> & MakeStringArray(vector<string> & dest, ...);
+
+extern const char * GenerateSeed;
+extern const char * GenerateChoice;
+extern const char * GenerateLen;
+extern const char * GenerateKeyLen;
+extern const char * GenerateValueLen;
+extern const char * GenerateRange;
+extern const char * GenerateKeyRange;
+extern const char * GenerateValueRange;
+
+enum GenerateType {
+ GenWord,
+ GenNumber,
+ GenBytes,
+};
+
+GenerateType GetGenerateType(const string & type);
+
+string & GenerateOne(string & dest, Random & r, GenerateType gtype, int64_t choice, int64_t len,
+ int64_t range = 0);
+/**
+ * Generate random string sequences
+ * @param dest dest array
+ * @param size output array size
+ * @param type string type (word|number|bytes|tera)
+ */
+vector<string> & Generate(vector<string> & dest, uint64_t size, const string & type);
+
+/**
+ * Generate random string pair sequences
+ * @param dest dest array
+ * @param size output array size
+ * @param type string type (word|number|bytes|tera)
+ */
+vector<pair<string, string> > & Generate(vector<pair<string, string> > & dest, uint64_t size,
+ const string & type);
+
+/**
+ * Generate random string pair sequences
+ * @param dest dest array
+ * @param length output bytes count
+ * @param type string type (word|number|bytes|tera)
+ */
+vector<pair<string, string> > & GenerateLength(vector<pair<string, string> > & dest,
+ uint64_t length, const string & type);
+
+/**
+ * Generate random KV text:
+ * Key0\tValue0\n
+ * Key1\tValue1\n
+ * ...
+ * @param dest dest string contain generated text
+ * @param size output kv pair count
+ * @param type string type (word|number|bytes|tera)
+ */
+string & GenerateKVText(string & dest, uint64_t size, const string & type);
+
+/**
+ * Generate random KV text:
+ * Key0\tValue0\n
+ * Key1\tValue1\n
+ * ...
+ * @param dest dest string contain generated text
+ * @param length output string length
+ * @param type string type (word|number|bytes|tera)
+ */
+string & GenerateKVTextLength(string & dest, uint64_t length, const string & type);
+
+/**
+ * File <-> String utilities
+ */
+string & ReadFile(string & dest, const string & path);
+void WriteFile(const string & content, const string & path);
+
+/**
+ * File compare
+ */
+bool FileEqual(const string & lh, const string & rh);
+
+/**
+ * generate k/v pairs with normal compression ratio
+ *
+ */
+class KVGenerator {
+protected:
+ uint32_t keylen;
+ uint32_t vallen;
+ bool unique;
+ long factor;
+ char * keyb;
+ char * valb;
+ char keyformat[32];
+ set<int64_t> old_keys;
+
+public:
+ KVGenerator(uint32_t keylen, uint32_t vallen, bool unique = false);
+
+ ~KVGenerator();
+
+ char * key(uint32_t & kl);
+
+ char * value(uint32_t & vl);
+
+ void write(FILE * fout, int64_t totallen);
+};
+
+#endif /* TEST_COMMONS_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestChecksum.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestChecksum.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestChecksum.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestChecksum.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/Checksum.h"
+#include "test_commons.h"
+
+void TestChecksum(ChecksumType type, void * buff, uint32_t len) {
+ uint32_t chm = Checksum::init(type);
+ Checksum::update(type, chm, buff, len);
+}
+
+TEST(Perf, CRC) {
+ uint32_t len = TestConfig.getInt("checksum.perf.size", 1024 * 1024 * 50);
+ int testTime = TestConfig.getInt("checksum.perf.time", 2);
+ char * buff = new char[len];
+ memset(buff, 1, len);
+ Timer timer;
+ for (int i = 0; i < testTime; i++) {
+ TestChecksum(CHECKSUM_CRC32, buff, len);
+ }
+ LOG("%s", timer.getSpeedM("CRC", len * testTime).c_str());
+ timer.reset();
+ for (int i = 0; i < testTime; i++) {
+ TestChecksum(CHECKSUM_CRC32C, buff, len);
+ }
+ LOG("%s", timer.getSpeedM("CRC32C", len * testTime).c_str());
+ delete[] buff;
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestHash.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestHash.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestHash.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestHash.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/Hash.h"
+#include "test_commons.h"
+
+static uint64_t test_length(int64_t len, size_t size, size_t loopTime) {
+ vector<string> data;
+ TestConfig.setInt(GenerateLen, len);
+ Generate(data, size, "bytes");
+ Timer t;
+ uint64_t ret;
+ for (size_t m = 0; m < loopTime; m++) {
+ for (size_t i = 0; i < data.size(); i++) {
+ ret += Hash::BytesHash(data[i].c_str(), data[i].length());
+ }
+ }
+ LOG("%s", t.getInterval(StringUtil::Format("Bytes%3lld", len).c_str()).c_str());
+ t.reset();
+ for (size_t m = 0; m < loopTime; m++) {
+ for (size_t i = 0; i < data.size(); i++) {
+ ret += Hash::CityHash(data[i].c_str(), data[i].length());
+ }
+ }
+ LOG("%s", t.getInterval(StringUtil::Format(" City%3lld", len).c_str()).c_str());
+ return ret;
+}
+
+TEST(Perf, Hash) {
+ uint64_t ret = 0;
+ ret += test_length(1, 100, 4000);
+ ret += test_length(17, 100, 4000);
+ ret += test_length(64, 100, 4000);
+ ret += test_length(128, 100, 4000);
+ ret += test_length(513, 100, 4000);
+ fprintf(stderr, "%llu\n", (long long unsigned int)ret);
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestProcess.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestProcess.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestProcess.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestProcess.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/Process.h"
+#include "test_commons.h"
+
+TEST(Process, Run) {
+ string out;
+ string err;
+ string cmd = TestConfig.get("process.run.cmd", "ls");
+ int retcode = Process::Run(cmd, &out, &err);
+ LOG("cmd: %s", cmd.c_str());
+ LOG("stdout: %s", out.c_str());
+ LOG("stderr: %s", err.c_str());
+ LOG("retcode: %d", retcode);
+ EXPECT_EQ(0, retcode);
+}
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestStringUtil.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestStringUtil.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestStringUtil.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestStringUtil.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/StringUtil.h"
+#include "test_commons.h"
+
+TEST(StringUtil, Convertion) {
+ ASSERT_FLOAT_EQ(StringUtil::toFloat("1.333"), 1.333);
+ ASSERT_FLOAT_EQ(StringUtil::toFloat(StringUtil::ToString(1.333f)), 1.333);
+ ASSERT_EQ(StringUtil::ToString(76957382U), "76957382");
+ ASSERT_EQ(StringUtil::ToString((uint64_t )76957382234233432ULL), "76957382234233432");
+ ASSERT_EQ(StringUtil::ToString(111, ' ', 40), " 111");
+}
+
+TEST(StringUtil, Format) {
+ string t = StringUtil::Format("%d %d %d %.3lf %s", 1, 2, 3, 1.333, "aaaaaaaaaaa");
+ ASSERT_EQ(t, "1 2 3 1.333 aaaaaaaaaaa");
+ string longstring(999, 'a');
+ string d = StringUtil::Format("%s", longstring.c_str());
+ ASSERT_EQ(longstring, d);
+}
+
+TEST(StringUtil, Trim) {
+ ASSERT_EQ(StringUtil::Trim(" \taaaa \t "), "aaaa");
+ ASSERT_EQ(StringUtil::Trim(" \t \t "), "");
+ ASSERT_EQ(StringUtil::Trim(""), "");
+}
+
+TEST(StringUtil, ToLower) {
+ ASSERT_EQ(StringUtil::ToLower("111ABabABabAbaB222"), "111abababababab222");
+ ASSERT_EQ(StringUtil::ToLower(""), "");
+}
+
+TEST(StringUtil, JoinSplit) {
+ vector<string> temp1, temp2, temp3, temp4;
+ StringUtil::Split("1aaa bbb ccc", " ", temp1, false);
+ StringUtil::Split(" 1aaa bbb ccc ", " ", temp2, true);
+ ASSERT_EQ(temp1, temp2);
+ string j = StringUtil::Join(temp1, ",");
+ ASSERT_EQ(j, "1aaa,bbb,ccc");
+ StringUtil::Split(" a b ", " ", temp3, false);
+ ASSERT_EQ(temp3, MakeStringArray(temp4, "", "", "a", "b", "", NULL));
+}
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestSyncUtils.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestSyncUtils.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestSyncUtils.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestSyncUtils.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/SyncUtils.h"
+#include "test_commons.h"
+
+class TestThread : public Thread {
+ virtual void run() {
+ for (uint32_t i = 0; i < 5; i++) {
+ usleep(100);
+ LOG("sleep %d", i * 100);
+ }
+ }
+};
+
+TEST(SyncUtil, Thread) {
+ TestThread a, b, c;
+ a.start();
+ b.start();
+ c.start();
+ a.join();
+ b.join();
+ c.join();
+}
+
+class TestBind {
+public:
+
+ int get() {
+ return 100;
+ }
+
+ void foo() {
+ for (uint32_t i = 0; i < 2; i++) {
+ usleep(100);
+ LOG("usleep %d", i * 100);
+ }
+ }
+ void bar(const char * msg) {
+ for (uint32_t i = 0; i < 2; i++) {
+ usleep(100);
+ LOG("usleep %d %s", i * 100, msg);
+ }
+ }
+
+};
+
+TEST(SyncUtil, ThreadBind) {
+ TestBind a = TestBind();
+ Runnable * bind1 = BindNew(a, &TestBind::get);
+ Thread t = Thread(bind1);
+ Runnable * bind2 = BindNew(a, &TestBind::bar, "testmsg");
+ Thread t2 = Thread(bind2);
+ t.start();
+ t2.start();
+ t.join();
+ t2.join();
+
+ delete bind1;
+ delete bind2;
+}
+
+//class TestParallelFor {
+//protected:
+// SpinLock lock;
+// uint64_t aggr;
+//public:
+// TestParallelFor() : aggr(0) {
+// }
+// void add(uint64_t i) {
+// lock.lock();
+// aggr += i;
+// lock.unlock();
+// }
+// void test(uint64_t n, size_t threadnum) {
+// aggr = 0;
+// ParallelFor(*this, &TestParallelFor::add, 0ULL, n, threadnum);
+// ASSERT_EQ(n*(n-1)/2, aggr);
+// }
+//};
+//
+//TEST(SyncUtil, ParallelFor) {
+// TestParallelFor tpf;
+// tpf.test(100000, 2);
+// tpf.test(100000, 3);
+// tpf.test(100000, 4);
+//}
+
+TEST(Perf, ThreadOverhead) {
+ int64_t threadnum = TestConfig.getInt("thread.num", 1000);
+ Thread * t = new Thread[threadnum];
+ Timer timer;
+ for (uint32_t i = 0; i < threadnum; i++) {
+ t[i].start();
+ }
+ for (uint32_t i = 0; i < threadnum; i++) {
+ t[i].join();
+ }
+ LOG("%lld thread %s", (long long int )threadnum, timer.getInterval("start&join").c_str());
+ delete[] t;
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestWritableUtils.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestWritableUtils.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestWritableUtils.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/util/TestWritableUtils.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/WritableUtils.h"
+#include "test_commons.h"
+
+void TestVLong(int64_t v) {
+ char buff[1024];
+ char buff2[1024];
+ uint32_t dsize = WritableUtils::GetVLongSize(v);
+ uint32_t wsize = (uint32_t)-1;
+ WritableUtils::WriteVLong(v, buff, wsize);
+ ASSERT_EQ(dsize, wsize);
+ memcpy(buff2, buff, wsize);
+ uint32_t rsize;
+ int64_t rv = WritableUtils::ReadVLong(buff2, rsize);
+ ASSERT_EQ(v, rv);
+ ASSERT_EQ(rsize, dsize);
+}
+
+
+TEST(WritableUtils, VLong) {
+ int num = TestConfig.getInt("test.size", 3000);
+ int seed = TestConfig.getInt("test.seed", -1);
+ Random r(seed);
+ for (int i = 0; i < num; i++) {
+ uint64_t v = r.nextLog2(((uint64_t)-1) / 2 - 3);
+ TestVLong(v);
+ TestVLong(-v);
+ }
+}
+
+
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/testData/testGlibCBugSpill.out
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/testData/testGlibCBugSpill.out?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/testData/testGlibCBugSpill.out (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/testData/testGlibCBugSpill.out Thu Jul 17 17:44:55 2014
@@ -0,0 +1,2 @@
[... 4 lines stripped ...]
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/resources/META-INF/services/org.apache.hadoop.mapred.nativetask.Platform Thu Jul 17 17:44:55 2014
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.hadoop.mapred.nativetask.HadoopPlatform
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/TestTaskContext.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import junit.framework.TestCase;
+
+public class TestTaskContext extends TestCase {
+
+ public void testTaskContext() {
+ TaskContext context = new TaskContext(null, null, null, null, null, null, null);
+
+ context.setInputKeyClass(IntWritable.class);
+ assertEquals(IntWritable.class.getName(), context.getInputKeyClass().getName());
+
+ context.setInputValueClass(Text.class);
+ assertEquals(Text.class.getName(), context.getInputValueClass().getName());
+
+ context.setOutputKeyClass(LongWritable.class);
+ assertEquals(LongWritable.class.getName(), context.getOuputKeyClass().getName());
+
+ context.setOutputValueClass(FloatWritable.class);
+ assertEquals(FloatWritable.class.getName(), context.getOutputValueClass().getName());
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestBufferPushPull.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.nativetask.DataReceiver;
+import org.apache.hadoop.mapred.nativetask.NativeDataSource;
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+import org.apache.hadoop.mapred.nativetask.handlers.BufferPullee;
+import org.apache.hadoop.mapred.nativetask.handlers.BufferPuller;
+import org.apache.hadoop.mapred.nativetask.handlers.BufferPushee;
+import org.apache.hadoop.mapred.nativetask.handlers.BufferPusher;
+import org.apache.hadoop.mapred.nativetask.handlers.IDataLoader;
+import org.apache.hadoop.mapred.nativetask.testutil.TestInput;
+import org.apache.hadoop.mapred.nativetask.testutil.TestInput.KV;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.apache.hadoop.util.Progress;
+import org.junit.Before;
+
+@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+public class TestBufferPushPull extends TestCase {
+
+ public static int BUFFER_LENGTH = 100; // 100 bytes
+ public static int INPUT_KV_COUNT = 1000;
+ private KV<BytesWritable, BytesWritable>[] dataInput;
+
+ @Override
+ @Before
+ public void setUp() {
+ this.dataInput = TestInput.getMapInputs(INPUT_KV_COUNT);
+ }
+
+ public void testPush() throws Exception {
+ final byte[] buff = new byte[BUFFER_LENGTH];
+
+ final InputBuffer input = new InputBuffer(buff);
+
+ final OutputBuffer out = new OutputBuffer(buff);
+
+ final Class<BytesWritable> iKClass = BytesWritable.class;
+ final Class<BytesWritable> iVClass = BytesWritable.class;
+
+ final RecordWriterForPush writer = new RecordWriterForPush() {
+ @Override
+ public void write(BytesWritable key, BytesWritable value) throws IOException {
+ final KV expect = dataInput[count++];
+ Assert.assertEquals(expect.key.toString(), key.toString());
+ Assert.assertEquals(expect.value.toString(), value.toString());
+ }
+ };
+
+ final BufferPushee pushee = new BufferPushee(iKClass, iVClass, writer);
+
+ final PushTarget handler = new PushTarget(out) {
+
+ @Override
+ public void sendData() throws IOException {
+ final int outputLength = out.length();
+ input.rewind(0, outputLength);
+ out.rewind();
+ pushee.collect(input);
+ }
+ };
+
+ final BufferPusher pusher = new BufferPusher(iKClass, iVClass, handler);
+
+ writer.reset();
+ for (int i = 0; i < INPUT_KV_COUNT; i++) {
+ pusher.collect(dataInput[i].key, dataInput[i].value);
+ }
+ pusher.close();
+ pushee.close();
+ }
+
+ public void testPull() throws Exception {
+ final byte[] buff = new byte[BUFFER_LENGTH];
+
+ final InputBuffer input = new InputBuffer(buff);
+
+ final OutputBuffer out = new OutputBuffer(buff);
+
+ final Class<BytesWritable> iKClass = BytesWritable.class;
+ final Class<BytesWritable> iVClass = BytesWritable.class;
+
+ final NativeHandlerForPull handler = new NativeHandlerForPull(input, out);
+
+ final KeyValueIterator iter = new KeyValueIterator();
+ final BufferPullee pullee = new BufferPullee(iKClass, iVClass, iter, handler);
+ handler.setDataLoader(pullee);
+
+ final BufferPuller puller = new BufferPuller(handler);
+ handler.setDataReceiver(puller);
+
+ int count = 0;
+
+ while (puller.next()) {
+ final DataInputBuffer key = puller.getKey();
+ final DataInputBuffer value = puller.getValue();
+
+ final BytesWritable keyBytes = new BytesWritable();
+ final BytesWritable valueBytes = new BytesWritable();
+
+ keyBytes.readFields(key);
+ valueBytes.readFields(value);
+
+ Assert.assertEquals(dataInput[count].key.toString(), keyBytes.toString());
+ Assert.assertEquals(dataInput[count].value.toString(), valueBytes.toString());
+
+ count++;
+ }
+
+ puller.close();
+ pullee.close();
+ }
+
+ public abstract class PushTarget implements NativeDataTarget {
+ OutputBuffer out;
+
+ PushTarget(OutputBuffer out) {
+ this.out = out;
+ }
+
+ @Override
+ public abstract void sendData() throws IOException;
+
+ @Override
+ public void finishSendData() throws IOException {
+ sendData();
+ }
+
+ @Override
+ public OutputBuffer getOutputBuffer() {
+ return out;
+ }
+ }
+
+ public abstract class RecordWriterForPush implements RecordWriter<BytesWritable, BytesWritable> {
+
+ protected int count = 0;
+
+ RecordWriterForPush() {
+ }
+
+ @Override
+ public abstract void write(BytesWritable key, BytesWritable value) throws IOException;
+
+ @Override
+ public void close(Reporter reporter) throws IOException {
+ }
+
+ public void reset() {
+ count = 0;
+ }
+ };
+
+ public static class NativeHandlerForPull implements NativeDataSource, NativeDataTarget {
+
+ InputBuffer in;
+ private final OutputBuffer out;
+
+ private IDataLoader dataLoader;
+ private DataReceiver dataReceiver;
+
+ public NativeHandlerForPull(InputBuffer input, OutputBuffer out) {
+ this.in = input;
+ this.out = out;
+ }
+
+ @Override
+ public InputBuffer getInputBuffer() {
+ return in;
+ }
+
+ @Override
+ public void setDataReceiver(DataReceiver handler) {
+ this.dataReceiver = handler;
+ }
+
+ @Override
+ public void loadData() throws IOException {
+ final int size = dataLoader.load();
+ }
+
+ public void setDataLoader(IDataLoader dataLoader) {
+ this.dataLoader = dataLoader;
+ }
+
+ @Override
+ public void sendData() throws IOException {
+ final int len = out.length();
+ out.rewind();
+ in.rewind(0, len);
+ dataReceiver.receiveData();
+ }
+
+ @Override
+ public void finishSendData() throws IOException {
+ dataReceiver.receiveData();
+ }
+
+ @Override
+ public OutputBuffer getOutputBuffer() {
+ return this.out;
+ }
+ }
+
+ public class KeyValueIterator implements RawKeyValueIterator {
+ int count = 0;
+ BytesWritable key;
+ BytesWritable value;
+
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ return convert(key);
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ return convert(value);
+ }
+
+ private DataInputBuffer convert(BytesWritable b) throws IOException {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ b.write(new DataOutputStream(out));
+ final byte[] array = out.toByteArray();
+ final DataInputBuffer result = new DataInputBuffer();
+ result.reset(array, array.length);
+ return result;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ if (count < INPUT_KV_COUNT) {
+ key = dataInput[count].key;
+ value = dataInput[count].key;
+ count++;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Progress getProgress() {
+ return null;
+ }
+ };
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+public class TestByteBufferReadWrite extends TestCase{
+
+
+ public void testReadWrite() throws IOException {
+ byte[] buff = new byte[10000];
+
+ InputBuffer input = new InputBuffer(buff);
+ MockDataTarget target = new MockDataTarget(buff);
+ ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
+
+ writer.write(1);
+ writer.write(new byte[] {2, 2}, 0, 2);
+ writer.writeBoolean(true);
+ writer.writeByte(4);
+ writer.writeShort(5);
+ writer.writeChar(6);
+ writer.writeInt(7);
+ writer.writeLong(8);
+ writer.writeFloat(9);
+ writer.writeDouble(10);
+ writer.writeBytes("goodboy");
+ writer.writeChars("hello");
+ writer.writeUTF("native task");
+
+ int length = target.getOutputBuffer().length();
+ input.rewind(0, length);
+ ByteBufferDataReader reader = new ByteBufferDataReader(input);
+
+ Assert.assertEquals(1, reader.read());
+ byte[] two = new byte[2];
+ reader.read(two);
+ Assert.assertTrue(two[0] == two[1] && two[0] == 2);
+
+
+ Assert.assertEquals(true, reader.readBoolean());
+ Assert.assertEquals(4, reader.readByte());
+ Assert.assertEquals(5, reader.readShort());
+ Assert.assertEquals(6, reader.readChar());
+ Assert.assertEquals(7, reader.readInt());
+ Assert.assertEquals(8, reader.readLong());
+ Assert.assertTrue(reader.readFloat() - 9 < 0.0001);
+ Assert.assertTrue(reader.readDouble() - 10 < 0.0001);
+
+ byte[] goodboy = new byte["goodboy".length()];
+ reader.read(goodboy);
+ Assert.assertEquals("goodboy", toString(goodboy));
+
+ char[] hello = new char["hello".length()];
+ for (int i = 0; i < hello.length; i++) {
+ hello[i] = reader.readChar();
+ }
+
+ String helloString = new String(hello);
+ Assert.assertEquals("hello", helloString);
+
+ Assert.assertEquals("native task", reader.readUTF());
+
+ Assert.assertEquals(0, input.remaining());
+ }
+
+ public void testShortOfSpace() throws IOException {
+ byte[] buff = new byte[10];
+ MockDataTarget target = new MockDataTarget(buff);
+ ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
+ Assert.assertEquals(false, writer.hasUnFlushedData());
+
+ writer.write(1);
+ writer.write(new byte[] {2, 2}, 0, 2);
+ Assert.assertEquals(true, writer.hasUnFlushedData());
+
+ Assert.assertEquals(true, writer.shortOfSpace(100));
+ }
+
+ public void testFlush() throws IOException {
+ byte[] buff = new byte[10];
+ final Counter flushCount = new Counter();
+ final Flag finishFlag = new Flag();
+ MockDataTarget target = new MockDataTarget(buff) {
+ @Override
+ public void sendData() throws IOException {
+ flushCount.increase();
+ }
+
+ @Override
+ public void finishSendData() throws IOException {
+ finishFlag.set(true);
+ }
+ };
+
+ ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
+ Assert.assertEquals(false, writer.hasUnFlushedData());
+
+ writer.write(1);
+ writer.write(new byte[100]);
+
+ Assert.assertEquals(true, writer.hasUnFlushedData());
+ writer.close();
+ Assert.assertEquals(11, flushCount.get());
+ Assert.assertEquals(true, finishFlag.get());
+
+ }
+
+ private static String toString(byte[] str) throws UnsupportedEncodingException {
+ return new String(str, 0, str.length, "UTF-8");
+ }
+
+ private static class MockDataTarget implements NativeDataTarget {
+
+ private OutputBuffer out;
+
+ MockDataTarget(byte[] buffer) {
+ this.out = new OutputBuffer(buffer);
+ }
+
+ @Override
+ public void sendData() throws IOException {
+
+ }
+
+ @Override
+ public void finishSendData() throws IOException {
+
+ }
+
+ @Override
+ public OutputBuffer getOutputBuffer() {
+ return out;
+ }
+ }
+
+ private static class Counter {
+ private int count;
+
+ public int get() {
+ return count;
+ }
+
+ public void increase() {
+ count++;
+ }
+ }
+
+ private static class Flag {
+ private boolean value;
+
+ public void set(boolean status) {
+ this.value = status;
+ }
+
+ public boolean get() {
+ return this.value;
+ }
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestDirectBufferPool.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.junit.Test;
+
+public class TestDirectBufferPool {
+
+ @Test
+ public void testGetInstance() throws Exception {
+ final int num = 100;
+ List<DirectBufferPool> pools = new ArrayList<DirectBufferPool>();
+ Thread[] list = new Thread[num];
+ for (int i = 0; i < num; i++) {
+ Thread t = getPoolThread(pools);
+ t.start();
+ list[i] = t;
+ }
+ for (int i = 0; i < num; i++) {
+ try {
+ list[i].join(10000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ DirectBufferPool p1 = pools.get(0);
+ assertNotNull(p1);
+ for (int i = 1; i < pools.size(); i++) {
+ DirectBufferPool p2 = pools.get(i);
+ assertNotNull(p2);
+ assertSame(p1, p2);
+ }
+ }
+
+ private Thread getPoolThread(final List<DirectBufferPool> pools) {
+ Thread t = new Thread() {
+ public void run() {
+ pools.add(DirectBufferPool.getInstance());
+ }
+ };
+ return t;
+ }
+
+
+ @Test
+ public void testBufBorrow() throws IOException {
+ final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+ ByteBuffer b1 = bufferPool.borrowBuffer(100);
+ assertTrue(b1.isDirect());
+ assertEquals(0, b1.position());
+ assertEquals(100, b1.capacity());
+ bufferPool.returnBuffer(b1);
+ ByteBuffer b2 = bufferPool.borrowBuffer(100);
+ assertTrue(b2.isDirect());
+ assertEquals(0, b2.position());
+ assertEquals(100, b2.capacity());
+ assertSame(b1, b2);
+
+ ByteBuffer b3 = bufferPool.borrowBuffer(100);
+ assertTrue(b3.isDirect());
+ assertEquals(0, b3.position());
+ assertEquals(100, b3.capacity());
+ assertNotSame(b2, b3);
+ bufferPool.returnBuffer(b2);
+ bufferPool.returnBuffer(b3);
+ }
+
+ @Test
+ public void testBufReset() throws IOException {
+ final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+ ByteBuffer b1 = bufferPool.borrowBuffer(100);
+ assertTrue(b1.isDirect());
+ assertEquals(0, b1.position());
+ assertEquals(100, b1.capacity());
+ b1.putInt(1);
+ assertEquals(4, b1.position());
+ bufferPool.returnBuffer(b1);
+ ByteBuffer b2 = bufferPool.borrowBuffer(100);
+ assertSame(b1, b2);
+ assertTrue(b2.isDirect());
+ assertEquals(0, b2.position());
+ assertEquals(100, b2.capacity());
+ }
+
+ @Test
+ public void testBufReturn() throws IOException {
+ final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+ int numOfBufs = 100;
+ int capacity = 100;
+ final ByteBuffer[] bufs = new ByteBuffer[numOfBufs];
+ for (int i = 0; i < numOfBufs; i++) {
+ bufs[i] = bufferPool.borrowBuffer(capacity);
+ }
+
+ assertEquals(0, bufferPool.getBufCountsForCapacity(capacity));
+
+
+ int numOfThreads = numOfBufs;
+ Thread[] list = new Thread[numOfThreads];
+ for (int i = 0; i < numOfThreads; i++) {
+ Thread t = retBufThread(bufferPool, bufs, i);
+ t.start();
+ list[i] = t;
+ }
+ for (int i = 0; i < numOfThreads; i++) {
+ try {
+ list[i].join(10000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity));
+ }
+
+ private Thread retBufThread(final DirectBufferPool bufferPool, final ByteBuffer[] bufs, final int i) {
+ Thread t = new Thread(new Runnable(){
+ @Override
+ public void run() {
+ try {
+ bufferPool.returnBuffer(bufs[i]);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ return t;
+ }
+
+ @Test
+ public void testBufException() {
+ final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+ boolean thrown = false;
+ try {
+ bufferPool.returnBuffer(null);
+ } catch (IOException e) {
+ thrown = true;
+ }
+ assertEquals(true, thrown);
+
+ thrown = false;
+ ByteBuffer buf = ByteBuffer.allocate(100);
+ try {
+ bufferPool.returnBuffer(buf);
+ } catch (IOException e) {
+ thrown = true;
+ }
+ assertEquals(true, thrown);
+ }
+
+ @Test
+ public void testBufWeakRefClear() throws IOException {
+ final DirectBufferPool bufferPool = DirectBufferPool.getInstance();
+ int numOfBufs = 100;
+ int capacity = 100;
+ ByteBuffer[] list = new ByteBuffer[capacity];
+ for (int i = 0; i < numOfBufs; i++) {
+ list[i] = bufferPool.borrowBuffer(capacity);
+ }
+ for (int i = 0; i < numOfBufs; i++) {
+ bufferPool.returnBuffer(list[i]);
+ list[i] = null;
+ }
+
+ assertEquals(numOfBufs, bufferPool.getBufCountsForCapacity(capacity));
+
+ for (int i = 0; i < 3; i++) {
+ System.gc();
+ }
+
+ ByteBuffer b = bufferPool.borrowBuffer(capacity);
+ assertEquals(0, bufferPool.getBufCountsForCapacity(capacity));
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestInputBuffer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.junit.Assert;
+
+public class TestInputBuffer extends TestCase {
+ public void testInputBuffer() throws IOException {
+ final int size = 100;
+ final InputBuffer input1 = new InputBuffer(BufferType.DIRECT_BUFFER, size);
+ Assert.assertEquals(input1.getType(), BufferType.DIRECT_BUFFER);
+
+ Assert.assertTrue(input1.position() == 0);
+ Assert.assertTrue(input1.length() == 0);
+ Assert.assertTrue(input1.remaining() == 0);
+ Assert.assertTrue(input1.capacity() == size);
+
+ final InputBuffer input2 = new InputBuffer(BufferType.HEAP_BUFFER, size);
+ Assert.assertEquals(input2.getType(), BufferType.HEAP_BUFFER);
+
+ Assert.assertTrue(input2.position() == 0);
+ Assert.assertTrue(input2.length() == 0);
+ Assert.assertTrue(input2.remaining() == 0);
+ Assert.assertTrue(input2.capacity() == size);
+
+ final InputBuffer input3 = new InputBuffer(new byte[size]);
+ Assert.assertEquals(input3.getType(), BufferType.HEAP_BUFFER);
+
+ Assert.assertTrue(input3.position() == 0);
+ Assert.assertTrue(input3.length() == 0);
+ Assert.assertTrue(input3.remaining() == 0);
+ Assert.assertEquals(input3.capacity(), size);
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestOutputBuffer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import junit.framework.TestCase;
+
+import org.junit.Assert;
+
+public class TestOutputBuffer extends TestCase {
+ public void testOutputBuffer() {
+ final int size = 100;
+ final OutputBuffer output1 = new OutputBuffer(BufferType.DIRECT_BUFFER, size);
+ Assert.assertEquals(output1.getType(), BufferType.DIRECT_BUFFER);
+
+ Assert.assertTrue(output1.length() == 0);
+ Assert.assertEquals(output1.limit(), size);
+
+ final OutputBuffer output2 = new OutputBuffer(BufferType.HEAP_BUFFER, size);
+ Assert.assertEquals(output2.getType(), BufferType.HEAP_BUFFER);
+
+ Assert.assertTrue(output2.length() == 0);
+ Assert.assertEquals(output2.limit(), size);
+
+ final OutputBuffer output3 = new OutputBuffer(new byte[size]);
+ Assert.assertEquals(output3.getType(), BufferType.HEAP_BUFFER);
+
+ Assert.assertTrue(output3.length() == 0);
+ Assert.assertEquals(output3.limit(), size);
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestCombineHandler.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.mapred.Task.CombinerRunner;
+import org.apache.hadoop.mapred.nativetask.Command;
+import org.apache.hadoop.mapred.nativetask.INativeHandler;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+public class TestCombineHandler extends TestCase {
+
+ private CombinerHandler handler;
+ private INativeHandler nativeHandler;
+ private BufferPusher pusher;
+ private BufferPuller puller;
+ private CombinerRunner combinerRunner;
+
+ @Override
+ public void setUp() throws IOException {
+
+ this.nativeHandler = Mockito.mock(INativeHandler.class);
+ this.pusher = Mockito.mock(BufferPusher.class);
+ this.puller = Mockito.mock(BufferPuller.class);
+ this.combinerRunner = Mockito.mock(CombinerRunner.class);
+
+ Mockito.when(nativeHandler.getInputBuffer()).thenReturn(new InputBuffer(BufferType.HEAP_BUFFER, 100));
+ }
+
+ public void testCommandDispatcherSetting() throws IOException {
+ this.handler = new CombinerHandler(nativeHandler, combinerRunner, puller, pusher);
+ Mockito.verify(nativeHandler, Mockito.times(1)).setCommandDispatcher(Matchers.eq(handler));
+ Mockito.verify(nativeHandler, Mockito.times(1)).setDataReceiver(Matchers.eq(puller));
+ }
+
+ public void testCombine() throws IOException, InterruptedException, ClassNotFoundException {
+ this.handler = new CombinerHandler(nativeHandler, combinerRunner, puller, pusher);
+ Assert.assertEquals(null, handler.onCall(CombinerHandler.COMBINE, null));
+ handler.close();
+ handler.close();
+
+ Mockito.verify(combinerRunner, Mockito.times(1)).combine(Matchers.eq(puller), Matchers.eq(pusher));
+
+ Mockito.verify(pusher, Mockito.times(1)).close();
+ Mockito.verify(puller, Mockito.times(1)).close();
+ Mockito.verify(nativeHandler, Mockito.times(1)).close();
+ }
+
+ public void testOnCall() throws IOException {
+ this.handler = new CombinerHandler(nativeHandler, combinerRunner, puller, pusher);
+ Assert.assertEquals(null, handler.onCall(new Command(-1), null));
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/handlers/TestNativeCollectorOnlyHandler.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.nativetask.Command;
+import org.apache.hadoop.mapred.nativetask.ICombineHandler;
+import org.apache.hadoop.mapred.nativetask.INativeHandler;
+import org.apache.hadoop.mapred.nativetask.TaskContext;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.util.OutputUtil;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+public class TestNativeCollectorOnlyHandler extends TestCase {
+
+ private NativeCollectorOnlyHandler handler;
+ private INativeHandler nativeHandler;
+ private BufferPusher pusher;
+ private ICombineHandler combiner;
+ private TaskContext taskContext;
+ private String localDir = "build/test/mapred/local";
+
+ @Override
+ public void setUp() throws IOException {
+ this.nativeHandler = Mockito.mock(INativeHandler.class);
+ this.pusher = Mockito.mock(BufferPusher.class);
+ this.combiner = Mockito.mock(ICombineHandler.class);
+ JobConf jobConf = new JobConf();
+ jobConf.set(OutputUtil.NATIVE_TASK_OUTPUT_MANAGER,
+ "org.apache.hadoop.mapred.nativetask.util.LocalJobOutputFiles");
+ jobConf.set("mapred.local.dir", localDir);
+ this.taskContext = new TaskContext(jobConf,
+ BytesWritable.class, BytesWritable.class,
+ BytesWritable.class,
+ BytesWritable.class,
+ null,
+ null);
+
+ Mockito.when(nativeHandler.getInputBuffer()).thenReturn(new InputBuffer(BufferType.HEAP_BUFFER, 100));
+ }
+
+ public void testCollect() throws IOException {
+ this.handler = new NativeCollectorOnlyHandler(taskContext, nativeHandler, pusher, combiner);
+ handler.collect(new BytesWritable(), new BytesWritable(), 100);
+ handler.close();
+ handler.close();
+
+ Mockito.verify(pusher, Mockito.times(1)).collect(Matchers.any(BytesWritable.class),
+ Matchers.any(BytesWritable.class), Matchers.anyInt());
+
+ Mockito.verify(pusher, Mockito.times(1)).close();
+ Mockito.verify(combiner, Mockito.times(1)).close();
+ Mockito.verify(nativeHandler, Mockito.times(1)).close();
+ }
+
+ public void testGetCombiner() throws IOException {
+ this.handler = new NativeCollectorOnlyHandler(taskContext, nativeHandler, pusher, combiner);
+ Mockito.when(combiner.getId()).thenReturn(100L);
+ final ReadWriteBuffer result = handler.onCall(NativeCollectorOnlyHandler.GET_COMBINE_HANDLER, null);
+ Assert.assertEquals(100L, result.readLong());
+ }
+
+ public void testOnCall() throws IOException {
+ this.handler = new NativeCollectorOnlyHandler(taskContext, nativeHandler, pusher, combiner);
+ boolean thrown = false;
+ try {
+ handler.onCall(new Command(-1), null);
+ } catch(final IOException e) {
+ thrown = true;
+ }
+ Assert.assertTrue("exception thrown", thrown);
+
+ final String expectedOutputPath = localDir + "/output/file.out";
+ final String expectedOutputIndexPath = localDir + "/output/file.out.index";
+ final String expectedSpillPath = localDir + "/output/spill0.out";
+
+ final String outputPath = handler.onCall(NativeCollectorOnlyHandler.GET_OUTPUT_PATH, null).readString();
+ Assert.assertEquals(expectedOutputPath, outputPath);
+
+ final String outputIndexPath = handler.onCall(NativeCollectorOnlyHandler.GET_OUTPUT_INDEX_PATH, null).readString();
+ Assert.assertEquals(expectedOutputIndexPath, outputIndexPath);
+
+ final String spillPath = handler.onCall(NativeCollectorOnlyHandler.GET_SPILL_PATH, null).readString();
+ Assert.assertEquals(expectedSpillPath, spillPath);
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestKVSerializer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.buffer.DataInputStream;
+import org.apache.hadoop.mapred.nativetask.buffer.DataOutputStream;
+import org.apache.hadoop.mapred.nativetask.testutil.TestInput;
+import org.apache.hadoop.mapred.nativetask.testutil.TestInput.KV;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class TestKVSerializer extends TestCase {
+
+ int inputArraySize = 1000; // 1000 bytesWriable elements
+ int bufferSize = 100; // bytes
+ private KV<BytesWritable, BytesWritable>[] inputArray;
+
+ final ByteArrayOutputStream result = new ByteArrayOutputStream();
+ private SizedWritable key;
+ private SizedWritable value;
+ private KVSerializer serializer;
+
+ @Override
+ @Before
+ public void setUp() throws IOException {
+ this.inputArray = TestInput.getMapInputs(inputArraySize);
+ this.key = new SizedWritable(BytesWritable.class);
+ this.value = new SizedWritable(BytesWritable.class);
+
+ this.serializer = new KVSerializer(BytesWritable.class, BytesWritable.class);
+
+ key.reset(inputArray[4].key);
+ value.reset(inputArray[4].value);
+ serializer.updateLength(key, value);
+ }
+
+ public void testUpdateLength() throws IOException {
+ Mockito.mock(DataOutputStream.class);
+
+ int kvLength = 0;
+ for (int i = 0; i < inputArraySize; i++) {
+ key.reset(inputArray[i].key);
+ value.reset(inputArray[i].value);
+ serializer.updateLength(key, value);
+
+ // verify whether the size increase
+ Assert.assertTrue(key.length + value.length > kvLength);
+ kvLength = key.length + value.length;
+ }
+ }
+
+ public void testSerializeKV() throws IOException {
+ final DataOutputStream dataOut = Mockito.mock(DataOutputStream.class);
+
+ Mockito.when(dataOut.hasUnFlushedData()).thenReturn(true);
+ Mockito.when(dataOut.shortOfSpace(key.length + value.length + Constants.SIZEOF_KV_LENGTH)).thenReturn(true);
+ final int written = serializer.serializeKV(dataOut, key, value);
+
+ // flush once, write 4 int, and 2 byte array
+ Mockito.verify(dataOut, Mockito.times(1)).flush();
+ Mockito.verify(dataOut, Mockito.times(4)).writeInt(Matchers.anyInt());
+ Mockito.verify(dataOut, Mockito.times(2)).write(Matchers.any(byte[].class), Matchers.anyInt(), Matchers.anyInt());
+
+ Assert.assertEquals(written, key.length + value.length + Constants.SIZEOF_KV_LENGTH);
+ }
+
+ public void testSerializeNoFlush() throws IOException {
+ final DataOutputStream dataOut = Mockito.mock(DataOutputStream.class);
+
+ // suppose there are enough space
+ Mockito.when(dataOut.hasUnFlushedData()).thenReturn(true);
+ Mockito.when(dataOut.shortOfSpace(Matchers.anyInt())).thenReturn(false);
+ final int written = serializer.serializeKV(dataOut, key, value);
+
+ // flush 0, write 4 int, and 2 byte array
+ Mockito.verify(dataOut, Mockito.times(0)).flush();
+ Mockito.verify(dataOut, Mockito.times(4)).writeInt(Matchers.anyInt());
+ Mockito.verify(dataOut, Mockito.times(2)).write(Matchers.any(byte[].class), Matchers.anyInt(), Matchers.anyInt());
+
+ Assert.assertEquals(written, key.length + value.length + Constants.SIZEOF_KV_LENGTH);
+ }
+
+ public void testSerializePartitionKV() throws IOException {
+ final DataOutputStream dataOut = Mockito.mock(DataOutputStream.class);
+
+ Mockito.when(dataOut.hasUnFlushedData()).thenReturn(true);
+ Mockito.when(
+ dataOut
+ .shortOfSpace(key.length + value.length + Constants.SIZEOF_KV_LENGTH + Constants.SIZEOF_PARTITION_LENGTH))
+ .thenReturn(true);
+ final int written = serializer.serializePartitionKV(dataOut, 100, key, value);
+
+ // flush once, write 4 int, and 2 byte array
+ Mockito.verify(dataOut, Mockito.times(1)).flush();
+ Mockito.verify(dataOut, Mockito.times(5)).writeInt(Matchers.anyInt());
+ Mockito.verify(dataOut, Mockito.times(2)).write(Matchers.any(byte[].class), Matchers.anyInt(), Matchers.anyInt());
+
+ Assert.assertEquals(written, key.length + value.length + Constants.SIZEOF_KV_LENGTH
+ + Constants.SIZEOF_PARTITION_LENGTH);
+ }
+
+ public void testDeserializerNoData() throws IOException {
+ final DataInputStream in = Mockito.mock(DataInputStream.class);
+ Mockito.when(in.hasUnReadData()).thenReturn(false);
+ Assert.assertEquals(0, serializer.deserializeKV(in, key, value));
+ }
+
+ public void testDeserializer() throws IOException {
+ final DataInputStream in = Mockito.mock(DataInputStream.class);
+ Mockito.when(in.hasUnReadData()).thenReturn(true);
+ Assert.assertTrue(serializer.deserializeKV(in, key, value) > 0);
+
+ Mockito.verify(in, Mockito.times(4)).readInt();
+ Mockito.verify(in, Mockito.times(2)).readFully(Matchers.any(byte[].class), Matchers.anyInt(), Matchers.anyInt());
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/serde/TestNativeSerialization.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+@SuppressWarnings({ "rawtypes", "deprecation" })
+public class TestNativeSerialization extends TestCase {
+ public void testRegisterAndGet() throws IOException {
+ final NativeSerialization serialization = NativeSerialization.getInstance();
+ serialization.reset();
+
+ serialization.register(WritableKey.class.getName(), ComparableKeySerializer.class);
+
+ INativeSerializer serializer = serialization.getSerializer(WritableKey.class);
+ Assert.assertEquals(ComparableKeySerializer.class.getName(), serializer.getClass().getName());
+
+ serializer = serialization.getSerializer(WritableValue.class);
+ Assert.assertEquals(DefaultSerializer.class.getName(), serializer.getClass().getName());
+
+ boolean ioExceptionThrown = false;
+ try {
+ serializer = serialization.getSerializer(NonWritableValue.class);
+ } catch (final IOException e) {
+ ioExceptionThrown = true;
+ }
+ Assert.assertTrue(ioExceptionThrown);
+ }
+
+ public static class WritableKey implements Writable {
+ private int value;
+
+ public WritableKey(int a) {
+ this.value = a;
+ }
+
+ public int getLength() {
+ return 4;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public void setValue(int v) {
+ this.value = v;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ }
+ }
+
+ public static class WritableValue implements Writable {
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ }
+ }
+
+ public static class NonWritableValue {
+ }
+
+ public static class ComparableKeySerializer implements INativeComparable, INativeSerializer<WritableKey> {
+
+ @Override
+ public int getLength(WritableKey w) throws IOException {
+ return w.getLength();
+ }
+
+ @Override
+ public void serialize(WritableKey w, DataOutput out) throws IOException {
+ out.writeInt(w.getValue());
+ }
+
+ @Override
+ public void deserialize(DataInput in, int length, WritableKey w) throws IOException {
+ w.setValue(in.readInt());
+ }
+ }
+}