You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/17 19:45:01 UTC
svn commit: r1611413 [11/18] - in
/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client:
./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nati...
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/Timer.h"
+#include "util/StringUtil.h"
+#include "Merge.h"
+#include "lib/FileSystem.h"
+
+namespace NativeTask {
+
+IFileMergeEntry * IFileMergeEntry::create(SingleSpillInfo * spill) {
+ InputStream * fileOut = FileSystem::getLocal().open(spill->path);
+ IFileReader * reader = new IFileReader(fileOut, spill, true);
+ return new IFileMergeEntry(reader);
+}
+
+Merger::Merger(IFileWriter * writer, Config * config, ComparatorPtr comparator,
+ ICombineRunner * combineRunner)
+ : _writer(writer), _config(config), _combineRunner(combineRunner), _first(true),
+ _comparator(comparator) {
+
+}
+
+Merger::~Merger() {
+ _heap.clear();
+ for (size_t i = 0; i < _entries.size(); i++) {
+ delete _entries[i];
+ }
+ _entries.clear();
+}
+
+void Merger::addMergeEntry(MergeEntryPtr pme) {
+ _entries.push_back(pme);
+}
+
+/**
+ * 0 if success, have next partition
+ * 1 if failed, no more
+ */
+bool Merger::startPartition() {
+ bool firstPartitionState = false;
+ for (size_t i = 0; i < _entries.size(); i++) {
+ bool partitionState = _entries[i]->nextPartition();
+ if (i == 0) {
+ firstPartitionState = partitionState;
+ }
+ if (firstPartitionState != partitionState) {
+ THROW_EXCEPTION(IOException, "MergeEntry partition number not equal");
+ }
+ }
+ if (firstPartitionState) { // do have new partition
+ _writer->startPartition();
+ }
+ return firstPartitionState;
+}
+
+/**
+ * finish one partition
+ */
+void Merger::endPartition() {
+ _writer->endPartition();
+}
+
+void Merger::initHeap() {
+ _heap.clear();
+ for (size_t i = 0; i < _entries.size(); i++) {
+ MergeEntryPtr pme = _entries[i];
+ if (pme->next()) {
+ _heap.push_back(pme);
+ }
+ }
+ makeHeap(&(_heap[0]), &(_heap[0]) + _heap.size(), _comparator);
+}
+
+bool Merger::next() {
+ size_t cur_heap_size = _heap.size();
+ if (cur_heap_size > 0) {
+ if (!_first) {
+ if (_heap[0]->next()) { // have more, adjust heap
+ if (cur_heap_size == 1) {
+ return true;
+ } else if (cur_heap_size == 2) {
+ MergeEntryPtr * base = &(_heap[0]);
+
+ if (_comparator(base[1], base[0])) {
+ std::swap(base[0], base[1]);
+ }
+ } else {
+ MergeEntryPtr * base = &(_heap[0]);
+ heapify(base, 1, cur_heap_size, _comparator);
+ }
+ } else { // no more, pop heap
+ MergeEntryPtr * base = &(_heap[0]);
+ popHeap(base, base + cur_heap_size, _comparator);
+ _heap.pop_back();
+ }
+ } else {
+ _first = false;
+ }
+ return _heap.size() > 0;
+ }
+ return false;
+}
+
+bool Merger::next(Buffer & key, Buffer & value) {
+ bool result = next();
+ if (result) {
+ MergeEntryPtr * base = &(_heap[0]);
+ key.reset(base[0]->getKey(), base[0]->getKeyLength());
+ value.reset(base[0]->getValue(), base[0]->getValueLength());
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void Merger::merge() {
+ Timer timer;
+ uint64_t total_record = 0;
+ _heap.reserve(_entries.size());
+ MergeEntryPtr * base = &(_heap[0]);
+ while (startPartition()) {
+ initHeap();
+ if (_heap.size() == 0) {
+ endPartition();
+ continue;
+ }
+ _first = true;
+ if (_combineRunner == NULL) {
+ while (next()) {
+ _writer->write(base[0]->getKey(), base[0]->getKeyLength(), base[0]->getValue(),
+ base[0]->getValueLength());
+ total_record++;
+ }
+ } else {
+ _combineRunner->combine(CombineContext(UNKNOWN), this, _writer);
+ }
+ endPartition();
+ }
+
+ uint64_t interval = (timer.now() - timer.last());
+ uint64_t M = 1000000; //1 million
+
+ uint64_t output_size;
+ uint64_t real_output_size;
+ _writer->getStatistics(output_size, real_output_size);
+
+ if (total_record != 0) {
+ LOG("[Merge] Merged segment#: %lu, record#: %llu, avg record size: %llu, uncompressed total bytes: %llu, compressed total bytes: %llu, time: %llu ms",
+ _entries.size(),
+ total_record,
+ output_size / (total_record),
+ output_size,
+ real_output_size,
+ interval / M);
+ } else {
+ LOG("[Merge] Merged segments#, %lu, uncompressed total bytes: %llu, compressed total bytes: %llu, time: %llu ms",
+ _entries.size(),
+ output_size,
+ real_output_size,
+ interval / M);
+ }
+}
+
+} // namespace NativeTask
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MERGE_H_
+#define MERGE_H_
+
+#include "NativeTask.h"
+#include "Buffers.h"
+#include "MapOutputCollector.h"
+#include "IFile.h"
+#include "MinHeap.h"
+
+namespace NativeTask {
+
+/**
+ * merger
+ */
+class MergeEntry {
+
+protected:
+ // these 3 fields should be filled after next() is called
+ const char * _key;
+ const char * _value;
+ uint32_t _keyLength;
+ uint32_t _valueLength;
+
+public:
+ MergeEntry()
+ : _key(NULL), _value(NULL), _keyLength(0), _valueLength(0) {
+ }
+
+ const char * getKey() const {
+ return _key;
+ }
+
+ const char * getValue() const {
+ return _value;
+ }
+
+ uint32_t getKeyLength() const {
+ return _keyLength;
+ }
+
+ uint32_t getValueLength() const {
+ return _valueLength;
+ }
+
+ virtual ~MergeEntry() {
+ }
+
+ /**
+ * move to next partition
+ * 0 on success
+ * 1 on no more
+ */
+ virtual bool nextPartition() = 0;
+
+ /**
+ * move to next key/value
+ * 0 on success
+ * 1 on no more
+ */
+ virtual bool next() = 0;
+};
+
+/**
+ * Merger
+ */
+typedef MergeEntry * MergeEntryPtr;
+
+class MergeEntryComparator {
+private:
+ ComparatorPtr _keyComparator;
+
+public:
+ MergeEntryComparator(ComparatorPtr comparator)
+ : _keyComparator(comparator) {
+ }
+
+public:
+ bool operator()(const MergeEntryPtr lhs, const MergeEntryPtr rhs) {
+ return (*_keyComparator)(lhs->getKey(), lhs->getKeyLength(), rhs->getKey(), rhs->getKeyLength())
+ < 0;
+ }
+};
+
+/**
+ * Merge entry for in-memory partition bucket
+ */
+class MemoryMergeEntry : public MergeEntry {
+protected:
+
+ PartitionBucket ** _partitions;
+ uint32_t _number;
+ int64_t _index;
+
+ KVIterator * _iterator;
+ Buffer keyBuffer;
+ Buffer valueBuffer;
+
+public:
+ MemoryMergeEntry(PartitionBucket ** partitions, uint32_t numberOfPartitions)
+ : _partitions(partitions), _number(numberOfPartitions), _index(-1), _iterator(NULL) {
+ }
+
+ virtual ~MemoryMergeEntry() {
+ if (NULL != _iterator) {
+ delete _iterator;
+ _iterator = NULL;
+ }
+ }
+
+ virtual bool nextPartition() {
+ ++_index;
+ if (_index < _number) {
+ PartitionBucket * current = _partitions[_index];
+ if (NULL != _iterator) {
+ delete _iterator;
+ _iterator = NULL;
+ }
+ if (NULL != current) {
+ _iterator = current->getIterator();
+ }
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * move to next key/value
+ * 0 on success
+ * 1 on no more
+ */
+ virtual bool next() {
+ if (NULL == _iterator) {
+ return false;
+ }
+ bool hasNext = _iterator->next(keyBuffer, valueBuffer);
+
+ if (hasNext) {
+ _keyLength = keyBuffer.length();
+ _key = keyBuffer.data();
+ _valueLength = valueBuffer.length();
+ _value = valueBuffer.data();
+ assert(_value != NULL);
+ return true;
+ }
+ // detect error early
+ _keyLength = 0xffffffff;
+ _valueLength = 0xffffffff;
+ _key = NULL;
+ _value = NULL;
+ return false;
+ }
+};
+
+/**
+ * Merge entry for intermediate file
+ */
+class IFileMergeEntry : public MergeEntry {
+protected:
+ IFileReader * _reader;
+ bool new_partition;
+public:
+ /**
+ * @param reader: managed by InterFileMergeEntry
+ */
+
+ static IFileMergeEntry * create(SingleSpillInfo * spill);
+
+ IFileMergeEntry(IFileReader * reader)
+ : _reader(reader) {
+ new_partition = false;
+ }
+
+ virtual ~IFileMergeEntry() {
+ delete _reader;
+ _reader = NULL;
+ }
+
+ /**
+ * move to next partition
+ * 0 on success
+ * 1 on no more
+ */
+ virtual bool nextPartition() {
+ return _reader->nextPartition();
+ }
+
+ /**
+ * move to next key/value
+ * 0 on success
+ * 1 on no more
+ */
+ virtual bool next() {
+ _key = _reader->nextKey(_keyLength);
+ if (unlikely(NULL == _key)) {
+ // detect error early
+ _keyLength = 0xffffffffU;
+ _valueLength = 0xffffffffU;
+ return false;
+ }
+ _value = _reader->value(_valueLength);
+ return true;
+ }
+};
+
+class Merger : public KVIterator {
+
+private:
+ vector<MergeEntryPtr> _entries;
+ vector<MergeEntryPtr> _heap;
+ IFileWriter * _writer;
+ Config * _config;
+ ICombineRunner * _combineRunner;
+ bool _first;
+ MergeEntryComparator _comparator;
+
+public:
+ Merger(IFileWriter * writer, Config * config, ComparatorPtr comparator,
+ ICombineRunner * combineRunner = NULL);
+
+ ~Merger();
+
+ void addMergeEntry(MergeEntryPtr pme);
+
+ void merge();
+
+ virtual bool next(Buffer & key, Buffer & value);
+protected:
+ bool startPartition();
+ void endPartition();
+ void initHeap();
+ bool next();
+};
+
+} // namespace NativeTask
+
+#endif /* MERGE_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MinHeap.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MinHeap.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MinHeap.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MinHeap.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MIN_HEAP_H_
+#define MIN_HEAP_H_
+
+#include "NativeTask.h"
+#include "Buffers.h"
+
+template<typename T, typename Compare>
+void heapify(T* first, int rt, int heap_len, Compare & Comp) {
+ while (rt * 2 <= heap_len) // not leaf
+ {
+ int left = (rt << 1); // left child
+ int right = (rt << 1) + 1; // right child
+ int smallest = rt;
+ if (Comp(*(first + left - 1), *(first + smallest - 1))) {
+ smallest = left;
+ }
+ if (right <= heap_len && Comp(*(first + right - 1), *(first + smallest - 1))) {
+ smallest = right;
+ }
+ if (smallest != rt) {
+ std::swap(*(first + smallest - 1), *(first + rt - 1));
+ rt = smallest;
+ } else {
+ break;
+ }
+ }
+}
+
+template<typename T, typename Compare>
+void makeHeap(T* begin, T* end, Compare & Comp) {
+ int heap_len = end - begin;
+ if (heap_len >= 0) {
+ for (uint32_t i = heap_len / 2; i >= 1; i--) {
+ heapify(begin, i, heap_len, Comp);
+ }
+ }
+}
+
+template<typename T, typename Compare>
+void popHeap(T* begin, T* end, Compare & Comp) {
+ *begin = *(end - 1);
+ // adjust [begin, end - 1) to heap
+ heapify(begin, 1, end - begin - 1, Comp);
+}
+
+#endif /* HEAP_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <dlfcn.h>
+
+#include "commons.h"
+#include "NativeObjectFactory.h"
+#include "NativeLibrary.h"
+
+namespace NativeTask {
+
+//////////////////////////////////////////////////////////////////
+// NativeLibrary methods
+//////////////////////////////////////////////////////////////////
+
+NativeLibrary::NativeLibrary(const string & path, const string & name)
+ : _path(path), _name(name), _getObjectCreatorFunc(NULL), _functionGetter(NULL) {
+
+}
+
+bool NativeLibrary::init() {
+ void *library = dlopen(_path.c_str(), RTLD_LAZY | RTLD_GLOBAL);
+ if (NULL == library) {
+ LOG("[NativeLibrary] Load object library %s failed.", _path.c_str());
+ return false;
+ }
+ // clean error status
+ dlerror();
+
+ string create_object_func_name = _name + "GetObjectCreator";
+ _getObjectCreatorFunc = (GetObjectCreatorFunc)dlsym(library, create_object_func_name.c_str());
+ if (NULL == _getObjectCreatorFunc) {
+ LOG("[NativeLibrary] ObjectCreator function [%s] not found", create_object_func_name.c_str());
+ }
+
+ string functionGetter = _name + "GetFunctionGetter";
+ _functionGetter = (FunctionGetter)dlsym(library, functionGetter.c_str());
+ if (NULL == _functionGetter) {
+ LOG("[NativeLibrary] function getter [%s] not found", functionGetter.c_str());
+ }
+
+ string init_library_func_name = _name + "Init";
+ InitLibraryFunc init_library_func = (InitLibraryFunc)dlsym(library,
+ init_library_func_name.c_str());
+ if (NULL == init_library_func) {
+ LOG("[NativeLibrary] Library init function [%s] not found", init_library_func_name.c_str());
+ } else {
+ init_library_func();
+ }
+ return true;
+}
+
+NativeObject * NativeLibrary::createObject(const string & clz) {
+ if (NULL == _getObjectCreatorFunc) {
+ return NULL;
+ }
+ return (NativeObject*)((_getObjectCreatorFunc(clz))());
+}
+
+void * NativeLibrary::getFunction(const string & functionName) {
+ if (NULL == _functionGetter) {
+ return NULL;
+ }
+ return (*_functionGetter)(functionName);
+}
+
+ObjectCreatorFunc NativeLibrary::getObjectCreator(const string & clz) {
+ if (NULL == _getObjectCreatorFunc) {
+ return NULL;
+ }
+ return _getObjectCreatorFunc(clz);
+}
+
+} // namespace NativeTask
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NATIVELIBRARY_H_
+#define NATIVELIBRARY_H_
+
+#include <string>
+
+namespace NativeTask {
+
+using std::string;
+class NativeObject;
+class NativeObjectFactory;
+
+/**
+ * User level object library abstraction
+ */
+class NativeLibrary {
+ friend class NativeObjectFactory;
+private:
+ string _path;
+ string _name;
+ GetObjectCreatorFunc _getObjectCreatorFunc;
+ FunctionGetter _functionGetter;
+public:
+ NativeLibrary(const string & path, const string & name);
+
+ bool init();
+
+ NativeObject * createObject(const string & clz);
+
+ void * getFunction(const string & functionName);
+
+ ObjectCreatorFunc getObjectCreator(const string & clz);
+
+ ~NativeLibrary() {
+ }
+};
+
+} // namespace NativeTask
+
+#endif /* NATIVELIBRARY_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <signal.h>
+#ifndef __CYGWIN__
+#include <execinfo.h>
+#endif
+#include "commons.h"
+#include "NativeTask.h"
+#include "NativeObjectFactory.h"
+#include "NativeLibrary.h"
+#include "BufferStream.h"
+#include "util/StringUtil.h"
+#include "util/SyncUtils.h"
+#include "util/WritableUtils.h"
+#include "handler/BatchHandler.h"
+#include "handler/MCollectorOutputHandler.h"
+#include "handler/CombineHandler.h"
+
+using namespace NativeTask;
+
+// TODO: just for debug, should be removed
+extern "C" void handler(int sig) {
+ void *array[10];
+ size_t size;
+
+ // print out all the frames to stderr
+ fprintf(stderr, "Error: signal %d:\n", sig);
+
+#ifndef __CYGWIN__
+ // get void*'s for all entries on the stack
+ size = backtrace(array, 10);
+
+ backtrace_symbols_fd(array, size, 2);
+#endif
+
+ exit(1);
+}
+
+DEFINE_NATIVE_LIBRARY(NativeTask) {
+ //signal(SIGSEGV, handler);
+ REGISTER_CLASS(BatchHandler, NativeTask);
+ REGISTER_CLASS(CombineHandler, NativeTask);
+ REGISTER_CLASS(MCollectorOutputHandler, NativeTask);
+ REGISTER_CLASS(Mapper, NativeTask);
+ REGISTER_CLASS(Reducer, NativeTask);
+ REGISTER_CLASS(Partitioner, NativeTask);
+ REGISTER_CLASS(Folder, NativeTask);
+ NativeObjectFactory::SetDefaultClass(BatchHandlerType, "NativeTask.BatchHandler");
+ NativeObjectFactory::SetDefaultClass(MapperType, "NativeTask.Mapper");
+ NativeObjectFactory::SetDefaultClass(ReducerType, "NativeTask.Reducer");
+ NativeObjectFactory::SetDefaultClass(PartitionerType, "NativeTask.Partitioner");
+ NativeObjectFactory::SetDefaultClass(FolderType, "NativeTask.Folder");
+}
+
+namespace NativeTask {
+
+static Config G_CONFIG;
+
+vector<NativeLibrary *> NativeObjectFactory::Libraries;
+map<NativeObjectType, string> NativeObjectFactory::DefaultClasses;
+Config * NativeObjectFactory::GlobalConfig = &G_CONFIG;
+float NativeObjectFactory::LastProgress = 0;
+Progress * NativeObjectFactory::TaskProgress = NULL;
+string NativeObjectFactory::LastStatus;
+set<Counter *> NativeObjectFactory::CounterSet;
+vector<Counter *> NativeObjectFactory::Counters;
+vector<uint64_t> NativeObjectFactory::CounterLastUpdateValues;
+bool NativeObjectFactory::Inited = false;
+
+static Lock FactoryLock;
+
+bool NativeObjectFactory::Init() {
+ ScopeLock<Lock> autolocak(FactoryLock);
+ if (Inited == false) {
+ // setup log device
+ string device = GetConfig().get(NATIVE_LOG_DEVICE, "stderr");
+ if (device == "stdout") {
+ LOG_DEVICE = stdout;
+ } else if (device == "stderr") {
+ LOG_DEVICE = stderr;
+ } else {
+ LOG_DEVICE = fopen(device.c_str(), "w");
+ }
+ NativeTaskInit();
+ NativeLibrary * library = new NativeLibrary("libnativetask.so", "NativeTask");
+ library->_getObjectCreatorFunc = NativeTaskGetObjectCreator;
+ Libraries.push_back(library);
+ Inited = true;
+ // load extra user provided libraries
+ string libraryConf = GetConfig().get(NATIVE_CLASS_LIBRARY_BUILDIN, "");
+ if (libraryConf.length() > 0) {
+ vector<string> libraries;
+ vector<string> pair;
+ StringUtil::Split(libraryConf, ",", libraries, true);
+ for (size_t i = 0; i < libraries.size(); i++) {
+ pair.clear();
+ StringUtil::Split(libraries[i], "=", pair, true);
+ if (pair.size() == 2) {
+ string & name = pair[0];
+ string & path = pair[1];
+ LOG("[NativeObjectLibrary] Try to load library [%s] with file [%s]", name.c_str(),
+ path.c_str());
+ if (false == RegisterLibrary(path, name)) {
+ LOG("[NativeObjectLibrary] RegisterLibrary failed: name=%s path=%s", name.c_str(),
+ path.c_str());
+ return false;
+ } else {
+ LOG("[NativeObjectLibrary] RegisterLibrary success: name=%s path=%s", name.c_str(),
+ path.c_str());
+ }
+ } else {
+ LOG("[NativeObjectLibrary] Illegal native.class.libray: [%s] in [%s]",
+ libraries[i].c_str(), libraryConf.c_str());
+ }
+ }
+ }
+ const char * version = GetConfig().get(NATIVE_HADOOP_VERSION);
+ LOG("[NativeObjectLibrary] NativeTask library initialized with hadoop %s",
+ version==NULL?"unkown":version);
+ }
+ return true;
+}
+
+void NativeObjectFactory::Release() {
+ ScopeLock<Lock> autolocak(FactoryLock);
+ for (ssize_t i = Libraries.size() - 1; i >= 0; i--) {
+ delete Libraries[i];
+ Libraries[i] = NULL;
+ }
+ Libraries.clear();
+ for (size_t i = 0; i < Counters.size(); i++) {
+ delete Counters[i];
+ }
+ Counters.clear();
+ if (LOG_DEVICE != stdout && LOG_DEVICE != stderr) {
+ fclose(LOG_DEVICE);
+ LOG_DEVICE = stderr;
+ }
+ Inited = false;
+}
+
+void NativeObjectFactory::CheckInit() {
+ if (Inited == false) {
+ if (!Init()) {
+ throw new IOException("Init NativeTask library failed.");
+ }
+ }
+}
+
+Config & NativeObjectFactory::GetConfig() {
+ return *GlobalConfig;
+}
+
+Config * NativeObjectFactory::GetConfigPtr() {
+ return GlobalConfig;
+}
+
+void NativeObjectFactory::SetTaskProgressSource(Progress * progress) {
+ TaskProgress = progress;
+}
+
+float NativeObjectFactory::GetTaskProgress() {
+ if (TaskProgress != NULL) {
+ LastProgress = TaskProgress->getProgress();
+ }
+ return LastProgress;
+}
+
+void NativeObjectFactory::SetTaskStatus(const string & status) {
+ LastStatus = status;
+}
+
+static Lock CountersLock;
+
+void NativeObjectFactory::GetTaskStatusUpdate(string & statusData) {
+ // Encoding:
+ // progress:float
+ // status:Text
+ // Counter number
+ // Counters[group:Text, name:Text, incrCount:Long]
+ OutputStringStream os(statusData);
+ float progress = GetTaskProgress();
+ WritableUtils::WriteFloat(&os, progress);
+ WritableUtils::WriteText(&os, LastStatus);
+ LastStatus.clear();
+ {
+ ScopeLock<Lock> AutoLock(CountersLock);
+ uint32_t numCounter = (uint32_t)Counters.size();
+ WritableUtils::WriteInt(&os, numCounter);
+ for (size_t i = 0; i < numCounter; i++) {
+ Counter * counter = Counters[i];
+ uint64_t newCount = counter->get();
+ uint64_t incr = newCount - CounterLastUpdateValues[i];
+ CounterLastUpdateValues[i] = newCount;
+ WritableUtils::WriteText(&os, counter->group());
+ WritableUtils::WriteText(&os, counter->name());
+ WritableUtils::WriteLong(&os, incr);
+ }
+ }
+}
+
+Counter * NativeObjectFactory::GetCounter(const string & group, const string & name) {
+ ScopeLock<Lock> AutoLock(CountersLock);
+ Counter tmpCounter(group, name);
+ set<Counter *>::iterator itr = CounterSet.find(&tmpCounter);
+ if (itr != CounterSet.end()) {
+ return *itr;
+ }
+ Counter * ret = new Counter(group, name);
+ Counters.push_back(ret);
+ CounterLastUpdateValues.push_back(0);
+ CounterSet.insert(ret);
+ return ret;
+}
+
+void NativeObjectFactory::RegisterClass(const string & clz, ObjectCreatorFunc func) {
+ NativeTaskClassMap__[clz] = func;
+}
+
+NativeObject * NativeObjectFactory::CreateObject(const string & clz) {
+ ObjectCreatorFunc creator = GetObjectCreator(clz);
+ return creator ? creator() : NULL;
+}
+
+void * NativeObjectFactory::GetFunction(const string & funcName) {
+ CheckInit();
+ {
+ for (vector<NativeLibrary*>::reverse_iterator ritr = Libraries.rbegin();
+ ritr != Libraries.rend(); ritr++) {
+ void * ret = (*ritr)->getFunction(funcName);
+ if (NULL != ret) {
+ return ret;
+ }
+ }
+ return NULL;
+ }
+}
+
+ObjectCreatorFunc NativeObjectFactory::GetObjectCreator(const string & clz) {
+ CheckInit();
+ {
+ for (vector<NativeLibrary*>::reverse_iterator ritr = Libraries.rbegin();
+ ritr != Libraries.rend(); ritr++) {
+ ObjectCreatorFunc ret = (*ritr)->getObjectCreator(clz);
+ if (NULL != ret) {
+ return ret;
+ }
+ }
+ return NULL;
+ }
+}
+
+void NativeObjectFactory::ReleaseObject(NativeObject * obj) {
+ delete obj;
+}
+
+bool NativeObjectFactory::RegisterLibrary(const string & path, const string & name) {
+ CheckInit();
+ {
+ NativeLibrary * library = new NativeLibrary(path, name);
+ bool ret = library->init();
+ if (!ret) {
+ delete library;
+ return false;
+ }
+ Libraries.push_back(library);
+ return true;
+ }
+}
+
+static Lock DefaultClassesLock;
+
+void NativeObjectFactory::SetDefaultClass(NativeObjectType type, const string & clz) {
+ ScopeLock<Lock> autolocak(DefaultClassesLock);
+ DefaultClasses[type] = clz;
+}
+
+NativeObject * NativeObjectFactory::CreateDefaultObject(NativeObjectType type) {
+ CheckInit();
+ {
+ if (DefaultClasses.find(type) != DefaultClasses.end()) {
+ string clz = DefaultClasses[type];
+ return CreateObject(clz);
+ }
+ LOG("[NativeObjectLibrary] Default class for NativeObjectType %s not found",
+ NativeObjectTypeToString(type).c_str());
+ return NULL;
+ }
+}
+
+int NativeObjectFactory::BytesComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength) {
+
+ uint32_t minlen = std::min(srcLength, destLength);
+ int64_t ret = fmemcmp(src, dest, minlen);
+ if (ret > 0) {
+ return 1;
+ } else if (ret < 0) {
+ return -1;
+ }
+ return srcLength - destLength;
+}
+
+int NativeObjectFactory::ByteComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength) {
+ return (*src) - (*dest);
+}
+
+int NativeObjectFactory::IntComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength) {
+ int result = (*src) - (*dest);
+ if (result == 0) {
+ uint32_t from = bswap(*(uint32_t*)src);
+ uint32_t to = bswap(*(uint32_t*)dest);
+ if (from > to) {
+ return 1;
+ } else if (from == to) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+ return result;
+}
+
+int NativeObjectFactory::LongComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength) {
+ int result = (int)(*src) - (int)(*dest);
+ if (result == 0) {
+
+ uint64_t from = bswap64(*(uint64_t*)src);
+ uint64_t to = bswap64(*(uint64_t*)dest);
+ if (from > to) {
+ return 1;
+ } else if (from == to) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+ return result;
+}
+
+int NativeObjectFactory::VIntComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength) {
+ int32_t from = WritableUtils::ReadVInt(src, srcLength);
+ int32_t to = WritableUtils::ReadVInt(dest, destLength);
+ if (from > to) {
+ return 1;
+ } else if (from == to) {
+ return 0;
+ } else {
+ return -1;
+ }
+}
+
+int NativeObjectFactory::VLongComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength) {
+ int64_t from = WritableUtils::ReadVLong(src, srcLength);
+ int64_t to = WritableUtils::ReadVLong(dest, destLength);
+ if (from > to) {
+ return 1;
+ } else if (from == to) {
+ return 0;
+ } else {
+ return -1;
+ }
+}
+
+int NativeObjectFactory::FloatComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength) {
+ if (srcLength != 4 || destLength != 4) {
+ THROW_EXCEPTION_EX(IOException, "float comparator, while src/dest lengt is not 4");
+ }
+
+ uint32_t from = bswap(*(uint32_t*)src);
+ uint32_t to = bswap(*(uint32_t*)dest);
+
+ float * srcValue = (float *)(&from);
+ float * destValue = (float *)(&to);
+
+ if ((*srcValue) < (*destValue)) {
+ return -1;
+ } else if ((*srcValue) == (*destValue)) {
+ return 0;
+ } else {
+ return 1;
+ }
+}
+
+int NativeObjectFactory::DoubleComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength) {
+ if (srcLength != 8 || destLength != 8) {
+ THROW_EXCEPTION_EX(IOException, "double comparator, while src/dest lengt is not 4");
+ }
+
+ uint64_t from = bswap64(*(uint64_t*)src);
+ uint64_t to = bswap64(*(uint64_t*)dest);
+
+ double * srcValue = (double *)(&from);
+ double * destValue = (double *)(&to);
+ if ((*srcValue) < (*destValue)) {
+ return -1;
+ } else if ((*srcValue) == (*destValue)) {
+ return 0;
+ } else {
+ return 1;
+ }
+}
+
+ComparatorPtr get_comparator(const KeyValueType keyType, const char * comparatorName) {
+ if (NULL == comparatorName) {
+ if (keyType == BytesType || keyType == TextType) {
+ return &NativeObjectFactory::BytesComparator;
+ } else if (keyType == ByteType || keyType == BoolType) {
+ return &NativeObjectFactory::ByteComparator;
+ } else if (keyType == IntType) {
+ return &NativeObjectFactory::IntComparator;
+ } else if (keyType == LongType) {
+ return &NativeObjectFactory::LongComparator;
+ } else if (keyType == FloatType) {
+ return &NativeObjectFactory::FloatComparator;
+ } else if (keyType == DoubleType) {
+ return &NativeObjectFactory::DoubleComparator;
+ } else if (keyType == VIntType) {
+ return &NativeObjectFactory::VIntComparator;
+ } else if (keyType == VLongType) {
+ return &NativeObjectFactory::VLongComparator;
+ }
+ } else {
+ void * func = NativeObjectFactory::GetFunction(string(comparatorName));
+ return (ComparatorPtr)func;
+ }
+ return NULL;
+}
+} // namespace NativeTask
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NATIVEOBJECTFACTORY_H_
+#define NATIVEOBJECTFACTORY_H_
+
+#include <string>
+#include <vector>
+#include <set>
+#include <map>
+
+#include "NativeTask.h"
+
+namespace NativeTask {
+
+using std::string;
+using std::vector;
+using std::map;
+using std::set;
+using std::pair;
+
+class NativeLibrary;
+
+/**
+ * Native object factory
+ */
+class NativeObjectFactory {
+private:
+ static vector<NativeLibrary *> Libraries;
+ static map<NativeObjectType, string> DefaultClasses;
+ static Config * GlobalConfig;
+ static float LastProgress;
+ static Progress * TaskProgress;
+ static string LastStatus;
+ static set<Counter *> CounterSet;
+ static vector<Counter *> Counters;
+ static vector<uint64_t> CounterLastUpdateValues;
+ static bool Inited;
+public:
+ static bool Init();
+ static void Release();
+ static void CheckInit();
+ static Config & GetConfig();
+ static Config * GetConfigPtr();
+ static void SetTaskProgressSource(Progress * progress);
+ static float GetTaskProgress();
+ static void SetTaskStatus(const string & status);
+ static void GetTaskStatusUpdate(string & statusData);
+ static Counter * GetCounter(const string & group, const string & name);
+ static void RegisterClass(const string & clz, ObjectCreatorFunc func);
+ static NativeObject * CreateObject(const string & clz);
+ static void * GetFunction(const string & clz);
+ static ObjectCreatorFunc GetObjectCreator(const string & clz);
+ static void ReleaseObject(NativeObject * obj);
+ static bool RegisterLibrary(const string & path, const string & name);
+ static void SetDefaultClass(NativeObjectType type, const string & clz);
+ static NativeObject * CreateDefaultObject(NativeObjectType type);
+ static int BytesComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength);
+ static int ByteComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength);
+ static int IntComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength);
+ static int LongComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength);
+ static int VIntComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength);
+ static int VLongComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength);
+ static int FloatComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength);
+ static int DoubleComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength);
+};
+
+} // namespace NativeTask
+
+#endif /* NATIVEOBJECTFACTORY_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef QUICK_BUILD
+#include "org_apache_hadoop_mapred_nativetask_NativeRuntime.h"
+#endif
+#include "commons.h"
+#include "jniutils.h"
+#include "NativeObjectFactory.h"
+
+using namespace NativeTask;
+
+///////////////////////////////////////////////////////////////
+// NativeRuntime JNI methods
+///////////////////////////////////////////////////////////////
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNIRelease
+ * Signature: ()V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRelease(
+ JNIEnv * jenv, jclass nativeRuntimeClass) {
+ try {
+ NativeTask::NativeObjectFactory::Release();
+ } catch (NativeTask::UnsupportException & e) {
+ JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+ } catch (NativeTask::OutOfMemoryException & e) {
+ JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+ } catch (NativeTask::IOException & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (NativeTask::JavaException & e) {
+ LOG("[NativeRuntimeJniImpl] JavaException: %s", e.what());
+ // Do nothing, let java side handle
+ } catch (std::exception & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (...) {
+ JNU_ThrowByName(jenv, "java/io/IOException", "[NativeRuntimeJniImpl] Unkown std::exception");
+ }
+}
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNIConfigure
+ * Signature: ([[B)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIConfigure(
+ JNIEnv * jenv, jclass nativeRuntimeClass, jobjectArray configs) {
+ try {
+ NativeTask::Config & config = NativeTask::NativeObjectFactory::GetConfig();
+ jsize len = jenv->GetArrayLength(configs);
+ for (jsize i = 0; i + 1 < len; i += 2) {
+ jbyteArray key_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i);
+ jbyteArray val_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i + 1);
+ config.set(JNU_ByteArrayToString(jenv, key_obj), JNU_ByteArrayToString(jenv, val_obj));
+ }
+ } catch (NativeTask::UnsupportException & e) {
+ JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+ } catch (NativeTask::OutOfMemoryException & e) {
+ JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+ } catch (NativeTask::IOException & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (NativeTask::JavaException & e) {
+ LOG("JavaException: %s", e.what());
+ // Do nothing, let java side handle
+ } catch (std::exception & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (...) {
+ JNU_ThrowByName(jenv, "java/io/IOException", "Unkown std::exception");
+ }
+}
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNICreateNativeObject
+ * Signature: ([B[B)J
+ */
+jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateNativeObject(
+ JNIEnv * jenv, jclass nativeRuntimeClass, jbyteArray clazz) {
+ try {
+ std::string typeString = JNU_ByteArrayToString(jenv, clazz);
+ return (jlong)(NativeTask::NativeObjectFactory::CreateObject(typeString));
+ } catch (NativeTask::UnsupportException & e) {
+ JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+ } catch (NativeTask::OutOfMemoryException & e) {
+ JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+ } catch (NativeTask::IOException & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (NativeTask::JavaException & e) {
+ LOG("JavaException: %s", e.what());
+ // Do nothing, let java side handle
+ } catch (std::exception & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (...) {
+ JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+ }
+ return 0;
+}
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNICreateDefaultNativeObject
+ * Signature: ([B)J
+ */JNIEXPORT jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateDefaultNativeObject(
+ JNIEnv * jenv, jclass nativeRuntimeClass, jbyteArray type) {
+ try {
+ std::string typeString = JNU_ByteArrayToString(jenv, type);
+ NativeTask::NativeObjectType type = NativeTask::NativeObjectTypeFromString(typeString.c_str());
+ return (jlong)(NativeTask::NativeObjectFactory::CreateDefaultObject(type));
+ } catch (NativeTask::UnsupportException & e) {
+ JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+ } catch (NativeTask::OutOfMemoryException & e) {
+ JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+ } catch (NativeTask::IOException & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (NativeTask::JavaException & e) {
+ LOG("[NativeRuntimeJniImpl] JavaException: %s", e.what());
+ // Do nothing, let java side handle
+ } catch (std::exception & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (...) {
+ JNU_ThrowByName(jenv, "java/io/IOException", "[NativeRuntimeJniImpl] Unknown exception");
+ }
+ return 0;
+}
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNIReleaseNativeObject
+ * Signature: (J)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIReleaseNativeObject(
+ JNIEnv * jenv, jclass nativeRuntimeClass, jlong objectAddr) {
+ try {
+ NativeTask::NativeObject * nobj = ((NativeTask::NativeObject *)objectAddr);
+ if (NULL == nobj) {
+ JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
+ "Object addr not instance of NativeObject");
+ return;
+ }
+ NativeTask::NativeObjectFactory::ReleaseObject(nobj);
+ } catch (NativeTask::UnsupportException & e) {
+ JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+ } catch (NativeTask::OutOfMemoryException & e) {
+ JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+ } catch (NativeTask::IOException & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (NativeTask::JavaException & e) {
+ LOG("JavaException: %s", e.what());
+ // Do nothing, let java side handle
+ } catch (std::exception & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (...) {
+ JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+ }
+}
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNIRegisterModule
+ * Signature: ([B[B)I
+ */JNIEXPORT jint JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRegisterModule(
+ JNIEnv * jenv, jclass nativeRuntimeClass, jbyteArray modulePath, jbyteArray moduleName) {
+ try {
+ std::string pathString = JNU_ByteArrayToString(jenv, modulePath);
+ std::string nameString = JNU_ByteArrayToString(jenv, moduleName);
+ if (NativeTask::NativeObjectFactory::RegisterLibrary(pathString, nameString)) {
+ return 0;
+ }
+ } catch (NativeTask::UnsupportException & e) {
+ JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+ } catch (NativeTask::OutOfMemoryException & e) {
+ JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+ } catch (NativeTask::IOException & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (NativeTask::JavaException & e) {
+ LOG("JavaException: %s", e.what());
+ // Do nothing, let java side handle
+ } catch (std::exception & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (...) {
+ JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+ }
+ return 1;
+}
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNIUpdateStatus
+ * Signature: ()[B
+ */JNIEXPORT jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIUpdateStatus(
+ JNIEnv * jenv, jclass nativeRuntimeClass) {
+ try {
+ std::string statusData;
+ NativeTask::NativeObjectFactory::GetTaskStatusUpdate(statusData);
+ jbyteArray ret = jenv->NewByteArray(statusData.length());
+ jenv->SetByteArrayRegion(ret, 0, statusData.length(), (jbyte*)statusData.c_str());
+ return ret;
+ } catch (NativeTask::UnsupportException & e) {
+ JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+ } catch (NativeTask::OutOfMemoryException & e) {
+ JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+ } catch (NativeTask::IOException & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (NativeTask::JavaException & e) {
+ LOG("JavaException: %s", e.what());
+ // Do nothing, let java side handle
+ } catch (std::exception & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (...) {
+ JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+ }
+ return NULL;
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeTask.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeTask.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeTask.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeTask.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CYGWIN__
+#include <execinfo.h>
+#endif
+#include "commons.h"
+#include "util/Hash.h"
+#include "util/StringUtil.h"
+#include "NativeTask.h"
+#include "NativeObjectFactory.h"
+
+namespace NativeTask {
+
+//////////////////////////////////////////////////////////////////
+// NativeObjectType methods
+//////////////////////////////////////////////////////////////////
+
+const string NativeObjectTypeToString(NativeObjectType type) {
+ switch (type) {
+ case BatchHandlerType:
+ return string("BatchHandlerType");
+ case MapperType:
+ return string("MapperType");
+ case ReducerType:
+ return string("ReducerType");
+ case PartitionerType:
+ return string("PartitionerType");
+ case CombinerType:
+ return string("CombinerType");
+ case FolderType:
+ return string("FolderType");
+ case RecordReaderType:
+ return string("RecordReaderType");
+ case RecordWriterType:
+ return string("RecordWriterType");
+ default:
+ return string("UnknownObjectType");
+ }
+}
+
+NativeObjectType NativeObjectTypeFromString(const string type) {
+ if (type == "BatchHandlerType") {
+ return BatchHandlerType;
+ } else if (type == "MapperType") {
+ return MapperType;
+ } else if (type == "ReducerType") {
+ return ReducerType;
+ } else if (type == "PartitionerType") {
+ return PartitionerType;
+ } else if (type == "CombinerType") {
+ return CombinerType;
+ } else if (type == "FolderType") {
+ return CombinerType;
+ } else if (type == "RecordReaderType") {
+ return RecordReaderType;
+ } else if (type == "RecordWriterType") {
+ return RecordWriterType;
+ }
+ return UnknownObjectType;
+}
+
+HadoopException::HadoopException(const string & what) {
+ // remove long path prefix
+ size_t n = 0;
+ if (what[0] == '/') {
+ size_t p = what.find(':');
+ if (p != what.npos) {
+ while (true) {
+ size_t np = what.find('/', n + 1);
+ if (np == what.npos || np >= p) {
+ break;
+ }
+ n = np;
+ }
+ }
+ }
+ _reason.append(what.c_str() + n, what.length() - n);
+ void *array[64];
+ size_t size;
+
+#ifndef __CYGWIN__
+ size = backtrace(array, 64);
+ char ** traces = backtrace_symbols(array, size);
+ for (size_t i = 0; i< size;i++) {
+ _reason.append("\n\t");
+ _reason.append(traces[i]);
+ }
+#endif
+}
+
+///////////////////////////////////////////////////////////
+
+void Config::load(const string & path) {
+ FILE * fin = fopen(path.c_str(), "r");
+ if (NULL == fin) {
+ THROW_EXCEPTION(IOException, "file not found or can not open for read");
+ }
+ char buff[256];
+ while (fgets(buff, 256, fin) != NULL) {
+ if (buff[0] == '#') {
+ continue;
+ }
+ std::string key = buff;
+ if (key[key.length() - 1] == '\n') {
+ size_t br = key.find('=');
+ if (br != key.npos) {
+ set(key.substr(0, br), StringUtil::Trim(key.substr(br + 1)));
+ }
+ }
+ }
+ fclose(fin);
+}
+
+void Config::set(const string & key, const string & value) {
+ _configs[key] = value;
+}
+
+void Config::setInt(const string & name, int64_t value) {
+ _configs[name] = StringUtil::ToString(value);
+}
+
+void Config::setBool(const string & name, bool value) {
+ _configs[name] = StringUtil::ToString(value);
+}
+
+void Config::parse(int32_t argc, const char ** argv) {
+ for (int32_t i = 0; i < argc; i++) {
+ const char * equ = strchr(argv[i], '=');
+ if (NULL == equ) {
+ LOG("[NativeTask] config argument not recognized: %s", argv[i]);
+ continue;
+ }
+ if (argv[i][0] == '-') {
+ LOG("[NativeTask] config argument with '-' prefix ignored: %s", argv[i]);
+ continue;
+ }
+ string key(argv[i], equ - argv[i]);
+ string value(equ + 1, strlen(equ + 1));
+ map<string, string>::iterator itr = _configs.find(key);
+ if (itr == _configs.end()) {
+ _configs[key] = value;
+ } else {
+ itr->second.append(",");
+ itr->second.append(value);
+ }
+ }
+}
+
+const char * Config::get(const string & name) {
+ map<string, string>::iterator itr = _configs.find(name);
+ if (itr == _configs.end()) {
+ return NULL;
+ } else {
+ return itr->second.c_str();
+ }
+}
+
+string Config::get(const string & name, const string & defaultValue) {
+ map<string, string>::iterator itr = _configs.find(name);
+ if (itr == _configs.end()) {
+ return defaultValue;
+ } else {
+ return itr->second;
+ }
+}
+
+int64_t Config::getInt(const string & name, int64_t defaultValue) {
+ map<string, string>::iterator itr = _configs.find(name);
+ if (itr == _configs.end()) {
+ return defaultValue;
+ } else {
+ return StringUtil::toInt(itr->second);
+ }
+}
+
+bool Config::getBool(const string & name, bool defaultValue) {
+ map<string, string>::iterator itr = _configs.find(name);
+ if (itr == _configs.end()) {
+ return defaultValue;
+ } else {
+ return StringUtil::toBool(itr->second);
+ }
+}
+
+float Config::getFloat(const string & name, float defaultValue) {
+ map<string, string>::iterator itr = _configs.find(name);
+ if (itr == _configs.end()) {
+ return defaultValue;
+ } else {
+ return StringUtil::toFloat(itr->second);
+ }
+}
+
+void Config::getStrings(const string & name, vector<string> & dest) {
+ map<string, string>::iterator itr = _configs.find(name);
+ if (itr != _configs.end()) {
+ StringUtil::Split(itr->second, ",", dest, true);
+ }
+}
+
+void Config::getInts(const string & name, vector<int64_t> & dest) {
+ vector<string> sdest;
+ getStrings(name, sdest);
+ for (size_t i = 0; i < sdest.size(); i++) {
+ dest.push_back(StringUtil::toInt(sdest[i]));
+ }
+}
+
+void Config::getFloats(const string & name, vector<float> & dest) {
+ vector<string> sdest;
+ getStrings(name, sdest);
+ for (size_t i = 0; i < sdest.size(); i++) {
+ dest.push_back(StringUtil::toFloat(sdest[i]));
+ }
+}
+
+///////////////////////////////////////////////////////////
+
+Counter * ProcessorBase::getCounter(const string & group, const string & name) {
+ return NULL;
+}
+
+uint32_t Partitioner::getPartition(const char * key, uint32_t & keyLen, uint32_t numPartition) {
+ if (numPartition == 1) {
+ return 0;
+ }
+ return (Hash::BytesHash(key, keyLen) & 0x7fffffff) % numPartition;
+}
+
+///////////////////////////////////////////////////////////
+
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/Timer.h"
+#include "util/StringUtil.h"
+#include "NativeObjectFactory.h"
+#include "PartitionBucket.h"
+#include "Merge.h"
+#include "NativeTask.h"
+#include "WritableUtils.h"
+#include "util/DualPivotQuickSort.h"
+#include "Combiner.h"
+#include "TaskCounters.h"
+#include "MinHeap.h"
+#include "PartitionBucketIterator.h"
+
+namespace NativeTask {
+
+KVIterator * PartitionBucket::getIterator() {
+ if (_memBlocks.size() == 0) {
+ return NULL;
+ }
+ return new PartitionBucketIterator(this, _keyComparator);
+}
+
+void PartitionBucket::spill(IFileWriter * writer) throw (IOException, UnsupportException) {
+ KVIterator * iterator = getIterator();
+ if (NULL == iterator || NULL == writer) {
+ return;
+ }
+
+ if (_combineRunner == NULL) {
+ Buffer key;
+ Buffer value;
+
+ while (iterator->next(key, value)) {
+ writer->write(key.data(), key.length(), value.data(), value.length());
+ }
+ } else {
+ _combineRunner->combine(CombineContext(UNKNOWN), iterator, writer);
+ }
+ delete iterator;
+}
+
+void PartitionBucket::sort(SortAlgorithm type) {
+ if (_memBlocks.size() == 0) {
+ return;
+ }
+ if ((!_sorted)) {
+ for (uint32_t i = 0; i < _memBlocks.size(); i++) {
+ MemoryBlock * block = _memBlocks[i];
+ block->sort(type, _keyComparator);
+ }
+ }
+ _sorted = true;
+}
+
+}
+;
+// namespace NativeTask
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef PARTITION_BUCKET_H_
+#define PARTITION_BUCKET_H_
+
+#include "NativeTask.h"
+#include "MemoryPool.h"
+#include "MemoryBlock.h"
+#include "Timer.h"
+#include "Buffers.h"
+#include "MapOutputSpec.h"
+#include "IFile.h"
+#include "SpillInfo.h"
+#include "Combiner.h"
+
+namespace NativeTask {
+
+/**
+ * Buffer for a single partition
+ */
+class PartitionBucket {
+ friend class PartitionBucketIterator;
+ friend class TestPartitionBucket;
+
+private:
+ std::vector<MemoryBlock *> _memBlocks;
+ MemoryPool * _pool;
+ uint32_t _partition;
+ uint32_t _blockSize;
+ ComparatorPtr _keyComparator;
+ ICombineRunner * _combineRunner;
+ bool _sorted;
+
+public:
+ PartitionBucket(MemoryPool * pool, uint32_t partition, ComparatorPtr comparator,
+ ICombineRunner * combineRunner, uint32_t blockSize)
+ : _pool(pool), _partition(partition), _keyComparator(comparator),
+ _combineRunner(combineRunner), _blockSize(blockSize), _sorted(false) {
+ if (NULL == _pool || NULL == comparator) {
+ THROW_EXCEPTION_EX(IOException, "pool is NULL, or comparator is not set");
+ }
+
+ if (NULL != combineRunner) {
+ LOG("[PartitionBucket] combine runner has been set");
+ }
+ }
+
+ ~PartitionBucket() {
+ reset();
+ }
+
+ uint32_t getPartitionId() {
+ return _partition;
+ }
+
+ void reset() {
+ for (uint32_t i = 0; i < _memBlocks.size(); i++) {
+ if (NULL != _memBlocks[i]) {
+ delete _memBlocks[i];
+ _memBlocks[i] = NULL;
+ }
+ }
+ _memBlocks.clear();
+ }
+
+ KVIterator * getIterator();
+
+ uint32_t getKVCount() const {
+ uint32_t size = 0;
+ for (uint32_t i = 0; i < _memBlocks.size(); i++) {
+ MemoryBlock * block = _memBlocks[i];
+ if (NULL != block) {
+ size += block->getKVCount();
+ }
+ }
+ return size;
+ }
+
+ /**
+ * @throws OutOfMemoryException if total_length > io.sort.mb
+ */
+ KVBuffer * allocateKVBuffer(uint32_t kvLength) {
+ if (kvLength == 0) {
+ LOG("KV Length is empty, no need to allocate buffer for it");
+ return NULL;
+ }
+ _sorted = false;
+ MemoryBlock * memBlock = NULL;
+ uint32_t memBockSize = _memBlocks.size();
+ if (memBockSize > 0) {
+ memBlock = _memBlocks[memBockSize - 1];
+ }
+ if (NULL != memBockSize && memBlock->remainSpace() >= kvLength) {
+ return memBlock->allocateKVBuffer(kvLength);
+ } else {
+ uint32_t min = kvLength;
+ uint32_t expect = std::max(_blockSize, min);
+ uint32_t allocated = 0;
+ char * buff = _pool->allocate(min, expect, allocated);
+ if (NULL != buff) {
+ memBlock = new MemoryBlock(buff, allocated);
+ _memBlocks.push_back(memBlock);
+ return memBlock->allocateKVBuffer(kvLength);
+ } else {
+ LOG("MemoryPool is full, fail to allocate new MemBlock, block size: %d, kv length: %d", expect, kvLength);
+ }
+ }
+ return NULL;
+ }
+
+ void sort(SortAlgorithm type);
+
+ void spill(IFileWriter * writer) throw (IOException, UnsupportException);
+
+ uint32_t getMemoryBlockCount() const {
+ return _memBlocks.size();
+ }
+
+ MemoryBlock * getMemoryBlock(uint32_t index) const {
+ return _memBlocks[index];
+ }
+};
+
+}
+;
+//namespace NativeTask
+
+#endif /* PARTITION_BUCKET_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/Timer.h"
+#include "util/StringUtil.h"
+#include "NativeObjectFactory.h"
+#include "PartitionBucketIterator.h"
+#include "Merge.h"
+#include "NativeTask.h"
+#include "WritableUtils.h"
+#include "util/DualPivotQuickSort.h"
+#include "Combiner.h"
+#include "TaskCounters.h"
+#include "MinHeap.h"
+
+namespace NativeTask {
+
+/////////////////////////////////////////////////////////////////
+// PartitionBucket
+/////////////////////////////////////////////////////////////////
+
+PartitionBucketIterator::PartitionBucketIterator(PartitionBucket * pb, ComparatorPtr comparator)
+ : _pb(pb), _comparator(comparator), _first(true) {
+ uint32_t blockCount = _pb->getMemoryBlockCount();
+ for (uint32_t i = 0; i < blockCount; i++) {
+ MemoryBlock * block = _pb->getMemoryBlock(i);
+ MemBlockIteratorPtr blockIterator = new MemBlockIterator(block);
+ if (blockIterator->next()) {
+ _heap.push_back(blockIterator);
+ }
+ }
+ if (_heap.size() > 0) {
+ makeHeap(&(_heap[0]), &(_heap[0]) + _heap.size(), _comparator);
+ }
+}
+
+PartitionBucketIterator::~PartitionBucketIterator() {
+ for (uint32_t i = 0; i < _heap.size(); i++) {
+ MemBlockIteratorPtr ptr = _heap[i];
+ if (NULL != ptr) {
+ delete ptr;
+ _heap[i] = NULL;
+ }
+ }
+}
+
+bool PartitionBucketIterator::next() {
+ size_t cur_heap_size = _heap.size();
+ if (cur_heap_size > 0) {
+ if (!_first) {
+ if (_heap[0]->next()) { // have more, adjust heap
+ if (cur_heap_size == 1) {
+ return true;
+ } else if (cur_heap_size == 2) {
+ MemBlockIteratorPtr * base = &(_heap[0]);
+
+ if (_comparator(base[1], base[0])) {
+ std::swap(base[0], base[1]);
+ }
+ } else {
+ MemBlockIteratorPtr * base = &(_heap[0]);
+ heapify(base, 1, cur_heap_size, _comparator);
+ }
+ } else { // no more, pop heap
+ MemBlockIteratorPtr * base = &(_heap[0]);
+ popHeap(base, base + cur_heap_size, _comparator);
+ _heap.pop_back();
+ }
+ } else {
+ _first = false;
+ }
+ return _heap.size() > 0;
+ }
+ return false;
+}
+
+bool PartitionBucketIterator::next(Buffer & key, Buffer & value) {
+ bool result = next();
+ if (result) {
+ MemBlockIteratorPtr * base = &(_heap[0]);
+ KVBuffer * kvBuffer = base[0]->getKVBuffer();
+
+ key.reset(kvBuffer->getKey(), kvBuffer->keyLength);
+ value.reset(kvBuffer->getValue(), kvBuffer->valueLength);
+
+ return true;
+ }
+ return false;
+}
+}
+;
+// namespace NativeTask
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef PARTITION_BUCKET_ITERATOR_H_
+#define PARTITION_BUCKET_ITERATOR_H_
+
+#include "NativeTask.h"
+#include "MemoryPool.h"
+#include "Timer.h"
+#include "Buffers.h"
+#include "MapOutputSpec.h"
+#include "IFile.h"
+#include "SpillInfo.h"
+#include "Combiner.h"
+#include "PartitionBucket.h"
+
+namespace NativeTask {
+
+class PartitionBucketIterator : public KVIterator {
+protected:
+ PartitionBucket * _pb;
+ std::vector<MemBlockIteratorPtr> _heap;
+ MemBlockComparator _comparator;
+ bool _first;
+
+public:
+ PartitionBucketIterator(PartitionBucket * pb, ComparatorPtr comparator);
+ virtual ~PartitionBucketIterator();
+ virtual bool next(Buffer & key, Buffer & value);
+
+private:
+ bool next();
+};
+
+}
+;
+//namespace NativeTask
+
+#endif /* PARTITION_BUCKET_ITERATOR_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Path.h"
+
+namespace NativeTask {
+
+bool Path::IsAbsolute(const string & path) {
+ if (path.length() > 0 && path[0] == '/') {
+ return true;
+ }
+ return false;
+}
+
+string Path::GetParent(const string & path) {
+ size_t lastSlash = path.rfind('/');
+ if (lastSlash == path.npos) {
+ return ".";
+ }
+ if (lastSlash == 0 && path.length() == 1) {
+ return "";
+ }
+ if (lastSlash == 0) {
+ return path;
+ }
+ return path.substr(0, lastSlash);
+}
+
+string Path::GetName(const string & path) {
+ size_t lastSlash = path.rfind('/');
+ if (lastSlash == path.npos) {
+ return path;
+ }
+ return path.substr(lastSlash + 1);
+}
+
+} // namespace NativeTask
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef PATH_H_
+#define PATH_H_
+
+#include <stdint.h>
+#include <string>
+
+namespace NativeTask {
+
+using std::string;
+
+class Path {
+public:
+ static bool IsAbsolute(const string & path);
+ static string GetParent(const string & path);
+ static string GetName(const string & path);
+};
+
+} // namespace NativeTask
+
+#endif /* PATH_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "Streams.h"
+#include "FileSystem.h"
+#include "Buffers.h"
+#include "SpillInfo.h"
+
+namespace NativeTask {
+
+void SingleSpillInfo::deleteSpillFile() {
+ if (path.length() > 0) {
+ struct stat st;
+ if (0 == stat(path.c_str(), &st)) {
+ remove(path.c_str());
+ }
+ }
+}
+
+void SingleSpillInfo::writeSpillInfo(const std::string & filepath) {
+ OutputStream * fout = FileSystem::getLocal().create(filepath, true);
+ {
+ ChecksumOutputStream dest = ChecksumOutputStream(fout, CHECKSUM_CRC32);
+ AppendBuffer appendBuffer;
+ appendBuffer.init(32 * 1024, &dest, "");
+ uint64_t base = 0;
+
+ for (size_t j = 0; j < this->length; j++) {
+ IFileSegment * segment = &(this->segments[j]);
+ const bool firstSegment = (j == 0);
+ if (firstSegment) {
+ appendBuffer.write_uint64_be(base);
+ appendBuffer.write_uint64_be(segment->uncompressedEndOffset);
+ appendBuffer.write_uint64_be(segment->realEndOffset);
+ } else {
+ appendBuffer.write_uint64_be(base + this->segments[j - 1].realEndOffset);
+ appendBuffer.write_uint64_be(
+ segment->uncompressedEndOffset - this->segments[j - 1].uncompressedEndOffset);
+ appendBuffer.write_uint64_be(segment->realEndOffset - this->segments[j - 1].realEndOffset);
+ }
+ }
+ appendBuffer.flush();
+ uint32_t chsum = dest.getChecksum();
+#ifdef SPILLRECORD_CHECKSUM_UINT
+ chsum = bswap(chsum);
+ fout->write(&chsum, sizeof(uint32_t));
+#else
+ uint64_t wtchsum = bswap64((uint64_t)chsum);
+ fout->write(&wtchsum, sizeof(uint64_t));
+#endif
+ }
+ fout->close();
+ delete fout;
+}
+
+}
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef PARTITIONINDEX_H_
+#define PARTITIONINDEX_H_
+
+#include <stdint.h>
+#include <string>
+
+namespace NativeTask {
+
+using std::string;
+
+/**
+ * Store spill file segment information
+ */
+struct IFileSegment {
+ // uncompressed stream end position
+ uint64_t uncompressedEndOffset;
+ // compressed stream end position
+ uint64_t realEndOffset;
+};
+
+class SingleSpillInfo {
+public:
+ uint32_t length;
+ std::string path;
+ IFileSegment * segments;
+ ChecksumType checkSumType;
+ KeyValueType keyType;
+ KeyValueType valueType;
+ std::string codec;
+
+ SingleSpillInfo(IFileSegment * segments, uint32_t len, const string & path, ChecksumType checksum,
+ KeyValueType ktype, KeyValueType vtype, const string & inputCodec)
+ : length(len), path(path), segments(segments), checkSumType(checksum), keyType(ktype),
+ valueType(vtype), codec(inputCodec) {
+ }
+
+ ~SingleSpillInfo() {
+ delete[] segments;
+ }
+
+ void deleteSpillFile();
+
+ uint64_t getEndPosition() {
+ return segments ? segments[length - 1].uncompressedEndOffset : 0;
+ }
+
+ uint64_t getRealEndPosition() {
+ return segments ? segments[length - 1].realEndOffset : 0;
+ }
+
+ void writeSpillInfo(const std::string & filepath);
+};
+
+class SpillInfos {
+public:
+ std::vector<SingleSpillInfo*> spills;
+ SpillInfos() {
+ }
+
+ ~SpillInfos() {
+ for (size_t i = 0; i < spills.size(); i++) {
+ delete spills[i];
+ }
+ spills.clear();
+ }
+
+ void deleteAllSpillFiles() {
+ for (size_t i = 0; i < spills.size(); i++) {
+ spills[i]->deleteSpillFile();
+ }
+ }
+
+ void add(SingleSpillInfo * sri) {
+ spills.push_back(sri);
+ }
+
+ uint32_t getSpillCount() const {
+ return spills.size();
+ }
+
+ SingleSpillInfo* getSingleSpillInfo(int index) {
+ return spills.at(index);
+ }
+};
+
+} // namespace NativeTask
+
+#endif /* PARTITIONINDEX_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillOutputService.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillOutputService.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillOutputService.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillOutputService.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SPILL_OUTPUT_SERVICE_H_
+#define SPILL_OUTPUT_SERVICE_H_
+
+#include <stdint.h>
+#include <string>
+
+namespace NativeTask {
+
+class CombineHandler;
+
+using std::string;
+
+class SpillOutputService {
+public:
+ virtual ~SpillOutputService() {}
+
+ virtual string * getSpillPath() = 0;
+ virtual string * getOutputPath() = 0;
+ virtual string * getOutputIndexPath() = 0;
+
+ virtual CombineHandler * getJavaCombineHandler() = 0;
+};
+
+} // namespace NativeTask
+
+#endif /* SPILL_OUTPUT_SERVICE_H_ */