You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/17 19:45:01 UTC
svn commit: r1611413 [9/18] - in
/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client:
./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nativ...
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,163 @@
+// Copyright 2005 and onwards Google Inc.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+// A light-weight compression algorithm. It is designed for speed of
+// compression and decompression, rather than for the utmost in space
+// savings.
+//
+// For getting better compression ratios when you are compressing data
+// with long repeated sequences or compressing data that is similar to
+// other data, while still compressing fast, you might look at first
+// using BMDiff and then compressing the output of BMDiff with
+// Snappy.
+
+#ifndef UTIL_SNAPPY_SNAPPY_H__
+#define UTIL_SNAPPY_SNAPPY_H__
+
+#include <stddef.h>
+#include <string>
+
+#include "snappy-stubs-public.h"
+
+namespace snappy {
+ class Source;
+ class Sink;
+
+ // ------------------------------------------------------------------------
+ // Generic compression/decompression routines.
+ // ------------------------------------------------------------------------
+
+ // Compress the bytes read from "*source" and append to "*sink". Return the
+ // number of bytes written.
+ size_t Compress(Source* source, Sink* sink);
+
+ // Find the uncompressed length of the given stream, as given by the header.
+ // Note that the true length could deviate from this; the stream could e.g.
+ // be truncated.
+ //
+ // Also note that this leaves "*source" in a state that is unsuitable for
+ // further operations, such as RawUncompress(). You will need to rewind
+ // or recreate the source yourself before attempting any further calls.
+ bool GetUncompressedLength(Source* source, uint32* result);
+
+ // ------------------------------------------------------------------------
+ // Higher-level string based routines (should be sufficient for most users)
+ // ------------------------------------------------------------------------
+
+ // Sets "*output" to the compressed version of "input[0,input_length-1]".
+ // Original contents of *output are lost.
+ //
+ // REQUIRES: "input[]" is not an alias of "*output".
+ size_t Compress(const char* input, size_t input_length, string* output);
+
+ // Decompresses "compressed[0,compressed_length-1]" to "*uncompressed".
+ // Original contents of "*uncompressed" are lost.
+ //
+ // REQUIRES: "compressed[]" is not an alias of "*uncompressed".
+ //
+ // returns false if the message is corrupted and could not be decompressed
+ bool Uncompress(const char* compressed, size_t compressed_length,
+ string* uncompressed);
+
+
+ // ------------------------------------------------------------------------
+ // Lower-level character array based routines. May be useful for
+ // efficiency reasons in certain circumstances.
+ // ------------------------------------------------------------------------
+
+ // REQUIRES: "compressed" must point to an area of memory that is at
+ // least "MaxCompressedLength(input_length)" bytes in length.
+ //
+ // Takes the data stored in "input[0..input_length]" and stores
+ // it in the array pointed to by "compressed".
+ //
+ // "*compressed_length" is set to the length of the compressed output.
+ //
+ // Example:
+ // char* output = new char[snappy::MaxCompressedLength(input_length)];
+ // size_t output_length;
+ // RawCompress(input, input_length, output, &output_length);
+ // ... Process(output, output_length) ...
+ // delete [] output;
+ void RawCompress(const char* input,
+ size_t input_length,
+ char* compressed,
+ size_t* compressed_length);
+
+ // Given data in "compressed[0..compressed_length-1]" generated by
+ // calling the Snappy::Compress routine, this routine
+ // stores the uncompressed data to
+ // uncompressed[0..GetUncompressedLength(compressed)-1]
+ // returns false if the message is corrupted and could not be decrypted
+ bool RawUncompress(const char* compressed, size_t compressed_length,
+ char* uncompressed);
+
+ // Given data from the byte source 'compressed' generated by calling
+ // the Snappy::Compress routine, this routine stores the uncompressed
+ // data to
+ // uncompressed[0..GetUncompressedLength(compressed,compressed_length)-1]
+ // returns false if the message is corrupted and could not be decrypted
+ bool RawUncompress(Source* compressed, char* uncompressed);
+
+ // Returns the maximal size of the compressed representation of
+ // input data that is "source_bytes" bytes in length;
+ size_t MaxCompressedLength(size_t source_bytes);
+
+ // REQUIRES: "compressed[]" was produced by RawCompress() or Compress()
+ // Returns true and stores the length of the uncompressed data in
+ // *result normally. Returns false on parsing error.
+ // This operation takes O(1) time.
+ bool GetUncompressedLength(const char* compressed, size_t compressed_length,
+ size_t* result);
+
+ // Returns true iff the contents of "compressed[]" can be uncompressed
+ // successfully. Does not return the uncompressed data. Takes
+ // time proportional to compressed_length, but is usually at least
+ // a factor of four faster than actual decompression.
+ bool IsValidCompressedBuffer(const char* compressed,
+ size_t compressed_length);
+
+ // The size of a compression block. Note that many parts of the compression
+ // code assumes that kBlockSize <= 65536; in particular, the hash table
+ // can only store 16-bit offsets, and EmitCopy() also assumes the offset
+ // is 65535 bytes or less. Note also that if you change this, it will
+ // affect the framing format (see framing_format.txt).
+ //
+ // Note that there might be older data around that is compressed with larger
+ // block sizes, so the decompression code should not rely on the
+ // non-existence of long backreferences.
+ static const int kBlockLog = 16;
+ static const size_t kBlockSize = 1 << kBlockLog;
+
+ static const int kMaxHashTableBits = 14;
+ static const size_t kMaxHashTableSize = 1 << kMaxHashTableBits;
+
+} // end namespace snappy
+
+
+#endif // UTIL_SNAPPY_SNAPPY_H__
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/StringUtil.h"
+#include "MCollectorOutputHandler.h"
+#include "NativeObjectFactory.h"
+#include "MapOutputCollector.h"
+#include "CombineHandler.h"
+
+using std::string;
+using std::vector;
+
+namespace NativeTask {
+
+const Command AbstractMapHandler::GET_OUTPUT_PATH(100, "GET_OUTPUT_PATH");
+const Command AbstractMapHandler::GET_OUTPUT_INDEX_PATH(101, "GET_OUTPUT_INDEX_PATH");
+const Command AbstractMapHandler::GET_SPILL_PATH(102, "GET_SPILL_PATH");
+const Command AbstractMapHandler::GET_COMBINE_HANDLER(103, "GET_COMBINE_HANDLER");
+} //namespace
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ABSTRACT_MAP_HANDLER_H
+#define ABSTRACT_MAP_HANDLER_H
+
+#include "NativeTask.h"
+#include "BatchHandler.h"
+#include "lib/SpillOutputService.h"
+#include "lib/Combiner.h"
+#include "CombineHandler.h"
+
+namespace NativeTask {
+
+class AbstractMapHandler : public BatchHandler, public SpillOutputService {
+public:
+ static const Command GET_OUTPUT_PATH;
+ static const Command GET_OUTPUT_INDEX_PATH;
+ static const Command GET_SPILL_PATH;
+ static const Command GET_COMBINE_HANDLER;
+
+public:
+ AbstractMapHandler() {}
+
+ virtual ~AbstractMapHandler() {}
+
+ virtual void configure(Config * config) {
+ _config = config;
+ }
+
+ virtual string * getOutputPath() {
+ ResultBuffer * outputPathResult = call(GET_OUTPUT_PATH, NULL);
+ if (NULL == outputPathResult) {
+ return NULL;
+ }
+ string * outputPath = outputPathResult->readString();
+
+ delete outputPathResult;
+ return outputPath;
+ }
+
+ virtual string * getOutputIndexPath() {
+
+ ResultBuffer * outputIndexPath = call(GET_OUTPUT_INDEX_PATH, NULL);
+ if (NULL == outputIndexPath) {
+ return NULL;
+ }
+ string * indexpath = outputIndexPath->readString();
+ delete outputIndexPath;
+ return indexpath;
+ }
+
+
+ virtual string * getSpillPath() {
+ ResultBuffer * spillPathBuffer = call(GET_SPILL_PATH, NULL);
+ if (NULL == spillPathBuffer) {
+ return NULL;
+ }
+ string * spillpath = spillPathBuffer->readString();
+ delete spillPathBuffer;
+ return spillpath;
+ }
+
+ virtual CombineHandler * getJavaCombineHandler() {
+
+ LOG("[MapOutputCollector::configure] java combiner is configured");
+
+ ResultBuffer * getCombineHandlerResult = call(GET_COMBINE_HANDLER, NULL);
+ if (NULL != getCombineHandlerResult) {
+
+ getCombineHandlerResult->setReadPoint(0);
+
+ CombineHandler * javaCombiner = (CombineHandler *)((BatchHandler * )(getCombineHandlerResult->readPointer()));
+ delete getCombineHandlerResult;
+ return javaCombiner;
+ }
+
+
+
+ return NULL;
+ }
+
+};
+
+} // namespace NativeTask
+
+#endif /* MMAPPERHANDLER_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef QUICK_BUILD
+#include "org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h"
+#endif
+#include "commons.h"
+#include "jni_md.h"
+#include "jniutils.h"
+#include "BatchHandler.h"
+#include "NativeObjectFactory.h"
+
+///////////////////////////////////////////////////////////////
+// NativeBatchProcessor jni util methods
+///////////////////////////////////////////////////////////////
+
+static jfieldID InputBufferFieldID = NULL;
+static jfieldID OutputBufferFieldID = NULL;
+static jmethodID FlushOutputMethodID = NULL;
+static jmethodID FinishOutputMethodID = NULL;
+static jmethodID SendCommandToJavaMethodID = NULL;
+
+///////////////////////////////////////////////////////////////
+// BatchHandler methods
+///////////////////////////////////////////////////////////////
+
+namespace NativeTask {
+
+ReadWriteBuffer * JNU_ByteArraytoReadWriteBuffer(JNIEnv * jenv, jbyteArray src) {
+ if (NULL == src) {
+ return NULL;
+ }
+ jsize len = jenv->GetArrayLength(src);
+
+ ReadWriteBuffer * ret = new ReadWriteBuffer(len);
+ jenv->GetByteArrayRegion(src, 0, len, (jbyte*)ret->getBuff());
+ ret->setWritePoint(len);
+ return ret;
+}
+
+jbyteArray JNU_ReadWriteBufferToByteArray(JNIEnv * jenv, ReadWriteBuffer * result) {
+ if (NULL == result || result->getWritePoint() == 0) {
+ return NULL;
+ }
+
+ jbyteArray ret = jenv->NewByteArray(result->getWritePoint());
+ jenv->SetByteArrayRegion(ret, 0, result->getWritePoint(), (jbyte*)result->getBuff());
+ return ret;
+}
+
+BatchHandler::BatchHandler()
+ : _processor(NULL), _config(NULL) {
+}
+
+BatchHandler::~BatchHandler() {
+ releaseProcessor();
+ if (NULL != _config) {
+ delete _config;
+ _config = NULL;
+ }
+}
+
+void BatchHandler::releaseProcessor() {
+ if (_processor != NULL) {
+ JNIEnv * env = JNU_GetJNIEnv();
+ env->DeleteGlobalRef((jobject)_processor);
+ _processor = NULL;
+ }
+}
+
+void BatchHandler::onInputData(uint32_t length) {
+ _in.rewind(0, length);
+ handleInput(_in);
+}
+
+void BatchHandler::flushOutput() {
+
+ if (NULL == _out.base()) {
+ return;
+ }
+
+ uint32_t length = _out.position();
+ _out.position(0);
+
+ if (length == 0) {
+ return;
+ }
+
+ JNIEnv * env = JNU_GetJNIEnv();
+ env->CallVoidMethod((jobject)_processor, FlushOutputMethodID, (jint)length);
+ if (env->ExceptionCheck()) {
+ THROW_EXCEPTION(JavaException, "FlushOutput throw exception");
+ }
+}
+
+void BatchHandler::finishOutput() {
+ if (NULL == _out.base()) {
+ return;
+ }
+ JNIEnv * env = JNU_GetJNIEnv();
+ env->CallVoidMethod((jobject)_processor, FinishOutputMethodID);
+ if (env->ExceptionCheck()) {
+ THROW_EXCEPTION(JavaException, "FinishOutput throw exception");
+ }
+}
+
+void BatchHandler::onSetup(Config * config, char * inputBuffer, uint32_t inputBufferCapacity,
+ char * outputBuffer, uint32_t outputBufferCapacity) {
+ this->_config = config;
+ _in.reset(inputBuffer, inputBufferCapacity);
+ if (NULL != outputBuffer) {
+ if (outputBufferCapacity <= 1024) {
+ THROW_EXCEPTION(IOException, "Output buffer size too small for BatchHandler");
+ }
+ _out.reset(outputBuffer, outputBufferCapacity);
+ _out.rewind(0, outputBufferCapacity);
+
+ LOG("[BatchHandler::onSetup] input Capacity %d, output capacity %d", inputBufferCapacity, _out.limit());
+ }
+ configure(_config);
+}
+
+ResultBuffer * BatchHandler::call(const Command& cmd, ParameterBuffer * param) {
+ JNIEnv * env = JNU_GetJNIEnv();
+ jbyteArray jcmdData = JNU_ReadWriteBufferToByteArray(env, param);
+ jbyteArray ret = (jbyteArray)env->CallObjectMethod((jobject)_processor, SendCommandToJavaMethodID,
+ cmd.id(), jcmdData);
+
+
+ if (env->ExceptionCheck()) {
+ THROW_EXCEPTION(JavaException, "SendCommandToJava throw exception");
+ }
+ return JNU_ByteArraytoReadWriteBuffer(env, ret);
+}
+
+} // namespace NativeTask
+
+///////////////////////////////////////////////////////////////
+// NativeBatchProcessor jni methods
+///////////////////////////////////////////////////////////////
+using namespace NativeTask;
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method: setupHandler
+ * Signature: (J)V
+ */
+void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_setupHandler(
+ JNIEnv * jenv, jobject processor, jlong handler, jobjectArray configs) {
+ try {
+
+ NativeTask::Config * config = new NativeTask::Config();
+ jsize len = jenv->GetArrayLength(configs);
+ for (jsize i = 0; i + 1 < len; i += 2) {
+ jbyteArray key_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i);
+ jbyteArray val_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i + 1);
+ config->set(JNU_ByteArrayToString(jenv, key_obj), JNU_ByteArrayToString(jenv, val_obj));
+ }
+
+ NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
+ if (NULL == batchHandler) {
+ JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException", "BatchHandler is null");
+ return;
+ }
+ jobject jinputBuffer = jenv->GetObjectField(processor, InputBufferFieldID);
+ char * inputBufferAddr = NULL;
+ uint32_t inputBufferCapacity = 0;
+ if (NULL != jinputBuffer) {
+ inputBufferAddr = (char*)(jenv->GetDirectBufferAddress(jinputBuffer));
+ inputBufferCapacity = jenv->GetDirectBufferCapacity(jinputBuffer);
+ }
+ jobject joutputBuffer = jenv->GetObjectField(processor, OutputBufferFieldID);
+ char * outputBufferAddr = NULL;
+ uint32_t outputBufferCapacity = 0;
+ if (NULL != joutputBuffer) {
+ outputBufferAddr = (char*)(jenv->GetDirectBufferAddress(joutputBuffer));
+ outputBufferCapacity = jenv->GetDirectBufferCapacity(joutputBuffer);
+ }
+ batchHandler->setProcessor(jenv->NewGlobalRef(processor));
+ batchHandler->onSetup(config, inputBufferAddr, inputBufferCapacity, outputBufferAddr,
+ outputBufferCapacity);
+ } catch (NativeTask::UnsupportException & e) {
+ JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+ } catch (NativeTask::OutOfMemoryException & e) {
+ JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+ } catch (NativeTask::IOException & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (NativeTask::JavaException & e) {
+ LOG("JavaException: %s", e.what());
+ // Do nothing, let java side handle
+ } catch (std::exception & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (...) {
+ JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+ }
+}
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method: nativeProcessInput
+ * Signature: (JI)V
+ */
+void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeProcessInput(
+ JNIEnv * jenv, jobject processor, jlong handler, jint length) {
+
+ try {
+ NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
+ if (NULL == batchHandler) {
+ JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
+ "handler not instance of BatchHandler");
+ return;
+ }
+ batchHandler->onInputData(length);
+ } catch (NativeTask::UnsupportException & e) {
+ JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+ } catch (NativeTask::OutOfMemoryException & e) {
+ JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+ } catch (NativeTask::IOException & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (NativeTask::JavaException & e) {
+ LOG("JavaException: %s", e.what());
+ // Do nothing, let java side handle
+ } catch (std::exception & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (...) {
+ JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+ }
+}
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method: nativeFinish
+ * Signature: (J)V
+ */
+void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeFinish(
+ JNIEnv * jenv, jobject processor, jlong handler) {
+ try {
+ NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
+ if (NULL == batchHandler) {
+ JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
+ "handler not instance of BatchHandler");
+ return;
+ }
+ batchHandler->onFinish();
+ } catch (NativeTask::UnsupportException & e) {
+ JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+ } catch (NativeTask::OutOfMemoryException & e) {
+ JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+ } catch (NativeTask::IOException & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (NativeTask::JavaException & e) {
+ LOG("JavaException: %s", e.what());
+ // Do nothing, let java side handle
+ } catch (std::exception & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (...) {
+ JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+ }
+}
+
+void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeLoadData(
+ JNIEnv * jenv, jobject processor, jlong handler) {
+ try {
+ NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
+ if (NULL == batchHandler) {
+ JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
+ "handler not instance of BatchHandler");
+ return;
+ }
+ batchHandler->onLoadData();
+ } catch (NativeTask::UnsupportException & e) {
+ JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+ } catch (NativeTask::OutOfMemoryException & e) {
+ JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+ } catch (NativeTask::IOException & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (NativeTask::JavaException & e) {
+ LOG("JavaException: %s", e.what());
+ // Do nothing, let java side handle
+ } catch (std::exception & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (...) {
+ JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+ }
+}
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method: nativeCommand
+ * Signature: (J[B)[B
+ */
+jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeCommand(
+ JNIEnv * jenv, jobject processor, jlong handler, jint command, jbyteArray cmdData) {
+ try {
+ NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
+ if (NULL == batchHandler) {
+ JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
+ "handler not instance of BatchHandler");
+ return NULL;
+ }
+ Command cmd(command);
+ ParameterBuffer * param = JNU_ByteArraytoReadWriteBuffer(jenv, cmdData);
+ ResultBuffer * result = batchHandler->onCall(cmd, param);
+ jbyteArray ret = JNU_ReadWriteBufferToByteArray(jenv, result);
+
+ delete result;
+ delete param;
+ return ret;
+ } catch (NativeTask::UnsupportException & e) {
+ JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+ } catch (NativeTask::OutOfMemoryException & e) {
+ JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+ } catch (NativeTask::IOException & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (const NativeTask::JavaException & e) {
+ LOG("JavaException: %s", e.what());
+ // Do nothing, let java side handle
+ } catch (std::exception & e) {
+ JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+ } catch (...) {
+ JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+ }
+ return NULL;
+}
+
+/*
+ * Class: org_apace_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method: InitIDs
+ * Signature: ()V
+ */
+void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_InitIDs(JNIEnv * jenv,
+ jclass processorClass) {
+ InputBufferFieldID = jenv->GetFieldID(processorClass, "rawOutputBuffer", "Ljava/nio/ByteBuffer;");
+ OutputBufferFieldID = jenv->GetFieldID(processorClass, "rawInputBuffer", "Ljava/nio/ByteBuffer;");
+ FlushOutputMethodID = jenv->GetMethodID(processorClass, "flushOutput", "(I)V");
+ FinishOutputMethodID = jenv->GetMethodID(processorClass, "finishOutput", "()V");
+ SendCommandToJavaMethodID = jenv->GetMethodID(processorClass, "sendCommandToJava", "(I[B)[B");
+}
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef BATCHHANDLER_H_
+#define BATCHHANDLER_H_
+
+#include "NativeTask.h"
+#include "lib/Buffers.h"
+
+namespace NativeTask {
+
+/**
+ * Native side counterpart of java side NativeBatchProcessor
+ */
+class BatchHandler : public Configurable {
+protected:
+ ByteBuffer _in;
+ ByteBuffer _out;
+ void * _processor;
+ Config * _config;
+public:
+ BatchHandler();
+ virtual ~BatchHandler();
+
+ virtual NativeObjectType type() {
+ return BatchHandlerType;
+ }
+
+ /**
+ * Called by native jni functions to set global jni reference
+ */
+ void setProcessor(void * processor) {
+ _processor = processor;
+ }
+
+ void releaseProcessor();
+
+ /**
+ * Called by java side to setup native side BatchHandler
+ * initialize buffers by default
+ */
+ void onSetup(Config * config, char * inputBuffer, uint32_t inputBufferCapacity,
+ char * outputBuffer, uint32_t outputBufferCapacity);
+
+ /**
+ * Called by java side to notice that input data available to handle
+ * @param length input buffer's available data length
+ */
+ void onInputData(uint32_t length);
+
+ virtual void onLoadData() {
+ }
+
+ /**
+ * Called by java side to notice that input has finished
+ */
+ void onFinish() {
+ finish();
+ }
+
+ /**
+ * Called by java side to send command to this handler
+ * BatchHandler ignore all command by default
+ * @param cmd command data
+ * @return command return value
+ */
+ virtual ResultBuffer * onCall(const Command& command, ReadWriteBuffer * param) {
+ return NULL;
+ }
+
+protected:
+ virtual ResultBuffer * call(const Command& cmd, ParameterBuffer * param);
+
+ /**
+ * Used by subclass, call java side flushOutput(int length)
+ * @param length output buffer's available data length
+ */
+ virtual void flushOutput();
+
+ /**
+ * Used by subclass, call java side finishOutput()
+ */
+ void finishOutput();
+
+ /**
+ * Write output buffer and use flushOutput manually,
+ * or use this helper method
+ */
+ inline void output(const char * buff, uint32_t length) {
+ while (length > 0) {
+ if (length > _out.remain()) {
+ flushOutput();
+ }
+ uint32_t remain = _out.remain();
+ uint32_t cp = length < remain ? length : remain;
+ simple_memcpy(_out.current(), buff, cp);
+ buff += cp;
+ length -= cp;
+ _out.advance(cp);
+ }
+ }
+
+ inline void outputInt(uint32_t v) {
+ if (4 > _out.remain()) {
+ flushOutput();
+ }
+ *(uint32_t*)(_out.current()) = v;
+ _out.advance(4);
+ }
+
+ /////////////////////////////////////////////////////////////
+ // Subclass should implement these if needed
+ /////////////////////////////////////////////////////////////
+
+ /**
+ * Called by onSetup, do nothing by default
+ * Subclass should override this if needed
+ */
+ virtual void configure(Config * config) {
+ }
+
+ /**
+ * Called by onFinish, flush & close output by default
+ * Subclass should override this if needed
+ */
+ virtual void finish() {
+ flushOutput();
+ finishOutput();
+ }
+ ;
+
+ /**
+ * Called by onInputData, internal input data processor,
+ * Subclass should override this if needed
+ */
+ virtual void handleInput(ByteBuffer & byteBuffer) {
+ }
+};
+
+} // namespace NativeTask
+
+#endif /* BATCHHANDLER_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "CombineHandler.h"
+
+namespace NativeTask {
+const char * REFILL = "refill";
+const int LENGTH_OF_REFILL_STRING = 6;
+
+const Command CombineHandler::COMBINE(4, "Combine");
+
+CombineHandler::CombineHandler()
+ : _combineContext(NULL), _kvIterator(NULL), _writer(NULL), _config(NULL), _kvCached(false),
+ _kType(UnknownType), _vType(UnknownType), _combineInputRecordCount(0), _combineInputBytes(0),
+ _combineOutputRecordCount(0),_combineOutputBytes(0){
+}
+
+CombineHandler::~CombineHandler() {
+}
+
+void CombineHandler::configure(Config * config) {
+
+ _config = config;
+ MapOutputSpec::getSpecFromConfig(_config, _mapOutputSpec);
+ _kType = _mapOutputSpec.keyType;
+ _vType = _mapOutputSpec.valueType;
+}
+
+uint32_t CombineHandler::feedDataToJavaInWritableSerialization() {
+
+ uint32_t written = 0;
+ bool firstKV = true;
+ _out.position(0);
+
+ if (_kvCached) {
+ uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength();
+ outputInt(bswap(_key.outerLength));
+ outputInt(bswap(_value.outerLength));
+ outputKeyOrValue(_key, _kType);
+ outputKeyOrValue(_value, _vType);
+
+ written += kvLength;
+ _kvCached = false;
+ firstKV = false;
+ }
+
+ uint32_t recordCount = 0;
+ while (nextKeyValue(_key, _value)) {
+
+ //::sleep(5);
+ _kvCached = false;
+ recordCount++;
+
+ uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength();
+
+ if (!firstKV && kvLength > _out.remain()) {
+ _kvCached = true;
+ break;
+ } else {
+ firstKV = false;
+ //write final key length and final value length
+ outputInt(bswap(_key.outerLength));
+ outputInt(bswap(_value.outerLength));
+ outputKeyOrValue(_key, _kType);
+ outputKeyOrValue(_value, _vType);
+
+ written += kvLength;
+ }
+ }
+
+ if (_out.position() > 0) {
+ flushOutput();
+ }
+
+ _combineInputRecordCount += recordCount;
+ _combineInputBytes += written;
+ return written;
+}
+
+/**
+ * KV: key or value
+ */
+void CombineHandler::outputKeyOrValue(SerializeInfo & KV, KeyValueType type) {
+ uint32_t length = 0;
+ switch (type) {
+ case TextType:
+ output(KV.varBytes, KV.outerLength - KV.buffer.length());
+ output(KV.buffer.data(), KV.buffer.length());
+ break;
+ case BytesType:
+ outputInt(bswap(KV.buffer.length()));
+ output(KV.buffer.data(), KV.buffer.length());
+ break;
+ default:
+ output(KV.buffer.data(), KV.buffer.length());
+ break;
+ }
+}
+
+bool CombineHandler::nextKeyValue(SerializeInfo & key, SerializeInfo & value) {
+
+ if (!_kvIterator->next(key.buffer, value.buffer)) {
+ return false;
+ }
+
+ uint32_t varLength = 0;
+ switch (_kType) {
+ case TextType:
+ WritableUtils::WriteVInt(key.buffer.length(), key.varBytes, varLength);
+ key.outerLength = key.buffer.length() + varLength;
+ break;
+ case BytesType:
+ key.outerLength = key.buffer.length() + 4;
+ break;
+ default:
+ key.outerLength = key.buffer.length();
+ break;
+ }
+
+ //prepare final value length
+ uint32_t varValueLength = 0;
+ switch (_vType) {
+ case TextType:
+ WritableUtils::WriteVInt(value.buffer.length(), value.varBytes, varValueLength);
+ value.outerLength = value.buffer.length() + varValueLength;
+ break;
+ case BytesType:
+ value.outerLength = value.buffer.length() + 4;
+ break;
+ default:
+ value.outerLength = value.buffer.length();
+ break;
+ }
+
+ return true;
+}
+
+uint32_t CombineHandler::feedDataToJava(SerializationFramework serializationType) {
+ if (serializationType == WRITABLE_SERIALIZATION) {
+ return feedDataToJavaInWritableSerialization();
+ }
+ THROW_EXCEPTION(IOException, "Native Serialization not supported");
+}
+
+void CombineHandler::handleInput(ByteBuffer & in) {
+ char * buff = in.current();
+ uint32_t length = in.remain();
+ const char * end = buff + length;
+ uint32_t remain = length;
+ char * pos = buff;
+ if (_asideBuffer.remain() > 0) {
+ uint32_t filledLength = _asideBuffer.fill(pos, length);
+ pos += filledLength;
+ remain -= filledLength;
+ }
+
+ if (_asideBuffer.size() > 0 && _asideBuffer.remain() == 0) {
+ _asideBuffer.position(0);
+ write(_asideBuffer.current(), _asideBuffer.size());
+ _asideBuffer.wrap(NULL, 0);
+ }
+
+ if (remain == 0) {
+ return;
+ }
+ KVBuffer * kvBuffer = (KVBuffer *)pos;
+
+ if (unlikely(remain < kvBuffer->headerLength())) {
+ THROW_EXCEPTION(IOException, "k/v meta information incomplete");
+ }
+
+ int kvLength = kvBuffer->lengthConvertEndium();
+
+ if (kvLength > remain) {
+ _asideBytes.resize(kvLength);
+ _asideBuffer.wrap(_asideBytes.buff(), _asideBytes.size());
+ _asideBuffer.fill(pos, remain);
+ pos += remain;
+ remain = 0;
+ } else {
+ write(pos, remain);
+ }
+}
+
+void CombineHandler::write(char * buf, uint32_t length) {
+ KVBuffer * kv = NULL;
+ char * pos = buf;
+ uint32_t remain = length;
+
+ uint32_t outputRecordCount = 0;
+ while (remain > 0) {
+ kv = (KVBuffer *)pos;
+ kv->keyLength = bswap(kv->keyLength);
+ kv->valueLength = bswap(kv->valueLength);
+ _writer->write(kv->getKey(), kv->keyLength, kv->getValue(), kv->valueLength);
+ outputRecordCount++;
+ remain -= kv->length();
+ pos += kv->length();
+ ;
+ }
+
+ _combineOutputRecordCount += outputRecordCount;
+ _combineOutputBytes += length;
+}
+
+string toString(uint32_t length) {
+ string result;
+ result.reserve(4);
+ result.assign((char *)(&length), 4);
+ return result;
+}
+
+void CombineHandler::onLoadData() {
+ feedDataToJava(WRITABLE_SERIALIZATION);
+}
+
+ResultBuffer * CombineHandler::onCall(const Command& command, ParameterBuffer * param) {
+ THROW_EXCEPTION(UnsupportException, "Command not supported by RReducerHandler");
+}
+
+void CombineHandler::combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer) {
+
+ _combineInputRecordCount = 0;
+ _combineOutputRecordCount = 0;
+ _combineInputBytes = 0;
+ _combineOutputBytes = 0;
+
+ this->_combineContext = &type;
+ this->_kvIterator = kvIterator;
+ this->_writer = writer;
+ call(COMBINE, NULL);
+
+ LOG("[CombineHandler] input Record Count: %d, input Bytes: %d, output Record Count: %d, output Bytes: %d",
+ _combineInputRecordCount, _combineInputBytes,
+ _combineOutputRecordCount, _combineOutputBytes);
+ return;
+}
+
+void CombineHandler::finish() {
+}
+
+} /* namespace NativeTask */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _COMBINEHANDLER_H_
+#define _COMBINEHANDLER_H_
+
+#include "Combiner.h"
+#include "BatchHandler.h"
+
+namespace NativeTask {
+
+enum SerializationFramework {
+ WRITABLE_SERIALIZATION = 0,
+ NATIVE_SERIALIZATION = 1
+};
+
+struct SerializeInfo {
+ Buffer buffer;
+ uint32_t outerLength;
+ char varBytes[8];
+};
+
+class CombineHandler : public NativeTask::ICombineRunner, public NativeTask::BatchHandler {
+public:
+ static const Command COMBINE;
+
+private:
+
+ CombineContext * _combineContext;
+ KVIterator * _kvIterator;
+ IFileWriter * _writer;
+ SerializeInfo _key;
+ SerializeInfo _value;
+
+ KeyValueType _kType;
+ KeyValueType _vType;
+ MapOutputSpec _mapOutputSpec;
+ Config * _config;
+ bool _kvCached;
+
+ uint32_t _combineInputRecordCount;
+ uint32_t _combineInputBytes;
+
+ uint32_t _combineOutputRecordCount;
+ uint32_t _combineOutputBytes;
+
+ FixSizeContainer _asideBuffer;
+ ByteArray _asideBytes;
+
+public:
+ CombineHandler();
+ virtual ~CombineHandler();
+
+ virtual void handleInput(ByteBuffer & byteBuffer);
+ void finish();
+
+ ResultBuffer * onCall(const Command& command, ParameterBuffer * param);
+
+ void configure(Config * config);
+
+ void combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer);
+
+ virtual void onLoadData();
+
+private:
+ void flushDataToWriter();
+ void outputKeyOrValue(SerializeInfo & info, KeyValueType type);
+ bool nextKeyValue(SerializeInfo & key, SerializeInfo & value);
+ uint32_t feedDataToJava(SerializationFramework serializationType);
+ uint32_t feedDataToJavaInWritableSerialization();
+ void write(char * buf, uint32_t length);
+
+};
+
+} /* namespace NativeTask */
+#endif /* _JAVACOMBINEHANDLER_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/StringUtil.h"
+#include "MCollectorOutputHandler.h"
+#include "NativeObjectFactory.h"
+#include "MapOutputCollector.h"
+#include "CombineHandler.h"
+
+using std::string;
+using std::vector;
+
+namespace NativeTask {
+
+MCollectorOutputHandler::MCollectorOutputHandler()
+ : _collector(NULL), _dest(NULL), _endium(LARGE_ENDIUM) {
+}
+
+MCollectorOutputHandler::~MCollectorOutputHandler() {
+ _dest = NULL;
+ delete _collector;
+ _collector = NULL;
+}
+
+void MCollectorOutputHandler::configure(Config * config) {
+ if (NULL == config) {
+ return;
+ }
+
+ uint32_t partition = config->getInt(MAPRED_NUM_REDUCES, 1);
+
+ _collector = new MapOutputCollector(partition, this);
+ _collector->configure(config);
+}
+
+void MCollectorOutputHandler::finish() {
+ _collector->close();
+ BatchHandler::finish();
+}
+
+void MCollectorOutputHandler::handleInput(ByteBuffer & in) {
+ char * buff = in.current();
+ uint32_t length = in.remain();
+
+ const char * end = buff + length;
+ char * pos = buff;
+ if (_kvContainer.remain() > 0) {
+ uint32_t filledLength = _kvContainer.fill(pos, length);
+ pos += filledLength;
+ }
+
+ while (end - pos > 0) {
+ KVBufferWithParititionId * kvBuffer = (KVBufferWithParititionId *)pos;
+
+ if (unlikely(end - pos < KVBuffer::headerLength())) {
+ THROW_EXCEPTION(IOException, "k/v meta information incomplete");
+ }
+
+ if (_endium == LARGE_ENDIUM) {
+ kvBuffer->partitionId = bswap(kvBuffer->partitionId);
+ kvBuffer->buffer.keyLength = bswap(kvBuffer->buffer.keyLength);
+ kvBuffer->buffer.valueLength = bswap(kvBuffer->buffer.valueLength);
+ }
+
+ uint32_t kvLength = kvBuffer->buffer.length();
+
+ KVBuffer * dest = allocateKVBuffer(kvBuffer->partitionId, kvLength);
+ _kvContainer.wrap((char *)dest, kvLength);
+
+ pos += 4; //skip the partition length
+ uint32_t filledLength = _kvContainer.fill(pos, end - pos);
+ pos += filledLength;
+ }
+}
+
+KVBuffer * MCollectorOutputHandler::allocateKVBuffer(uint32_t partitionId, uint32_t kvlength) {
+ KVBuffer * dest = _collector->allocateKVBuffer(partitionId, kvlength);
+ return dest;
+}
+
+} //namespace
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MCOLLECTOROUTPUTHANDLER_H_
+#define MCOLLECTOROUTPUTHANDLER_H_
+
+#include "BatchHandler.h"
+#include "lib/SpillOutputService.h"
+#include "AbstractMapHandler.h"
+
+namespace NativeTask {
+class MapOutputCollector;
+
+class MCollectorOutputHandler : public AbstractMapHandler {
+private:
+
+ FixSizeContainer _kvContainer;
+
+ MapOutputCollector * _collector;
+ // state info for large KV pairs
+ char * _dest;
+
+ Endium _endium;
+
+public:
+ MCollectorOutputHandler();
+ virtual ~MCollectorOutputHandler();
+
+ virtual void configure(Config * config);
+ virtual void finish();
+ virtual void handleInput(ByteBuffer & byteBuffer);
+private:
+ KVBuffer * allocateKVBuffer(uint32_t partition, uint32_t kvlength);
+};
+
+}
+
+#endif /* MCOLLECTOROUTPUTHANDLER_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,54 @@
+/* DO NOT EDIT THIS FILE - it is machine generated */
+#include <jni.h>
+/* Header for class org_apache_hadoop_mapred_nativetask_NativeBatchProcessor */
+
+#ifndef _Included_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+#define _Included_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+#ifdef __cplusplus
+extern "C" {
+#endif
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method: setupHandler
+ * Signature: (J)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_setupHandler(
+ JNIEnv *, jobject, jlong, jobjectArray configs);
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method: nativeProcessInput
+ * Signature: (JI)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeProcessInput(
+ JNIEnv *, jobject, jlong, jint);
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method: nativeFinish
+ * Signature: (J)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeFinish(
+ JNIEnv *, jobject, jlong);
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method: nativeCommand
+ * Signature: (J[B)[B
+ */JNIEXPORT jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeCommand(
+ JNIEnv *, jobject, jlong, jint, jbyteArray);
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method: nativeLoadData
+ * Signature: (J)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeLoadData(
+ JNIEnv *, jobject, jlong);
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method: InitIDs
+ * Signature: ()V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_InitIDs(
+ JNIEnv *, jclass);
+
+#ifdef __cplusplus
+}
+#endif
+#endif
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "BufferStream.h"
+
+namespace NativeTask {
+
+BufferedInputStream::BufferedInputStream(InputStream * stream, uint32_t bufferSize)
+ : FilterInputStream(stream), _buff(NULL), _position(0), _limit(0), _capacity(0) {
+ _buff = (char*)malloc(bufferSize);
+ if (NULL != _buff) {
+ LOG("[BuferStream] malloc failed when create BufferedInputStream with buffersize %u",
+ bufferSize);
+ _capacity = bufferSize;
+ }
+}
+
+BufferedInputStream::~BufferedInputStream() {
+ if (NULL != _buff) {
+ free(_buff);
+ _buff = NULL;
+ _position = 0;
+ _limit = 0;
+ _capacity = 0;
+ }
+}
+
+void BufferedInputStream::seek(uint64_t position) {
+ if (_limit - _position > 0) {
+ THROW_EXCEPTION(IOException, "temporary buffered data exists when fseek()");
+ }
+ _stream->seek(position);
+}
+
+uint64_t BufferedInputStream::tell() {
+ return _stream->tell() - (_limit - _position);
+}
+
+int32_t BufferedInputStream::read(void * buff, uint32_t length) {
+ uint32_t rest = _limit - _position;
+ if (rest > 0) {
+ // have some data in buffer, read from buffer
+ uint32_t cp = rest < length ? rest : length;
+ memcpy(buff, _buff + _position, cp);
+ _position += cp;
+ return cp;
+ } else if (length >= _capacity / 2) {
+ // dest buffer big enough, read to dest buffer directly
+ return _stream->read(buff, length);
+ } else {
+ // read to buffer first, then copy part of it to dest
+ _limit = 0;
+ do {
+ int32_t rd = _stream->read(_buff + _limit, _capacity - _limit);
+ if (rd <= 0) {
+ break;
+ }
+ } while (_limit < _capacity / 2);
+ if (_limit == 0) {
+ return -1;
+ }
+ uint32_t cp = _limit < length ? _limit : length;
+ memcpy(buff, _buff, cp);
+ _position = cp;
+ return cp;
+ }
+}
+
+/////////////////////////////////////////////////////////////////
+
+BufferedOutputStream::BufferedOutputStream(InputStream * stream, uint32_t bufferSize)
+ : FilterOutputStream(_stream), _buff(NULL), _position(0), _capacity(0) {
+ _buff = (char*)malloc(bufferSize + sizeof(uint64_t));
+ if (NULL != _buff) {
+ LOG("[BuferStream] malloc failed when create BufferedOutputStream with buffersize %u",
+ bufferSize);
+ _capacity = bufferSize;
+ }
+}
+
+BufferedOutputStream::~BufferedOutputStream() {
+ if (NULL != _buff) {
+ free(_buff);
+ _buff = NULL;
+ _position = 0;
+ _capacity = 0;
+ }
+}
+
+uint64_t BufferedOutputStream::tell() {
+ return _stream->tell() + _position;
+}
+
+void BufferedOutputStream::write(const void * buff, uint32_t length) {
+ if (length < _capacity / 2) {
+ uint32_t rest = _capacity - _position;
+ if (length < rest) {
+ simple_memcpy(_buff + _position, buff, length);
+ _position += length;
+ } else {
+ flush();
+ simple_memcpy(_buff, buff, length);
+ _position = length;
+ }
+ } else {
+ flush();
+ _stream->write(buff, length);
+ }
+}
+
+void BufferedOutputStream::flush() {
+ if (_position > 0) {
+ _stream->write(_buff, _position);
+ _position = 0;
+ }
+}
+
+///////////////////////////////////////////////////////////
+
+int32_t InputBuffer::read(void * buff, uint32_t length) {
+ uint32_t rd = _capacity - _position < length ? _capacity - _position : length;
+ if (rd > 0) {
+ memcpy(buff, _buff + _position, rd);
+ _position += rd;
+ return rd;
+ }
+ return length == 0 ? 0 : -1;
+}
+
+void OutputBuffer::write(const void * buff, uint32_t length) {
+ if (_position + length <= _capacity) {
+ memcpy(_buff + _position, buff, length);
+ _position += length;
+ } else {
+ THROW_EXCEPTION(IOException, "OutputBuffer too small to write");
+ }
+}
+
+} // namespace NativeTask
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef BUFFERSTREAM_H_
+#define BUFFERSTREAM_H_
+
+#include <string>
+#include "Streams.h"
+
+namespace NativeTask {
+
+using std::string;
+
+class BufferedInputStream : public FilterInputStream {
+protected:
+ char * _buff;
+ uint32_t _position;
+ uint32_t _limit;
+ uint32_t _capacity;
+public:
+ BufferedInputStream(InputStream * stream, uint32_t bufferSize = 64 * 1024);
+
+ virtual ~BufferedInputStream();
+
+ virtual void seek(uint64_t position);
+
+ virtual uint64_t tell();
+
+ virtual int32_t read(void * buff, uint32_t length);
+};
+
+class BufferedOutputStream : public FilterOutputStream {
+protected:
+ char * _buff;
+ uint32_t _position;
+ uint32_t _capacity;
+
+public:
+ BufferedOutputStream(InputStream * stream, uint32_t bufferSize = 64 * 1024);
+
+ virtual ~BufferedOutputStream();
+
+ virtual uint64_t tell();
+
+ virtual void write(const void * buff, uint32_t length);
+
+ virtual void flush();
+
+};
+
+class InputBuffer : public InputStream {
+protected:
+ const char * _buff;
+ uint32_t _position;
+ uint32_t _capacity;
+public:
+ InputBuffer()
+ : _buff(NULL), _position(0), _capacity(0) {
+ }
+
+ InputBuffer(const char * buff, uint32_t capacity)
+ : _buff(buff), _position(0), _capacity(capacity) {
+ }
+
+ InputBuffer(const string & src)
+ : _buff(src.data()), _position(0), _capacity(src.length()) {
+ }
+
+ virtual ~InputBuffer() {
+ }
+
+ virtual void seek(uint64_t position) {
+ if (position <= _capacity) {
+ _position = position;
+ } else {
+ _position = _capacity;
+ }
+ }
+
+ virtual uint64_t tell() {
+ return _position;
+ }
+
+ virtual int32_t read(void * buff, uint32_t length);
+
+ void reset(const char * buff, uint32_t capacity) {
+ _buff = buff;
+ _position = 0;
+ _capacity = capacity;
+ }
+
+ void reset(const string & src) {
+ _buff = src.data();
+ _position = 0;
+ _capacity = src.length();
+ }
+
+ void rewind() {
+ _position = 0;
+ }
+};
+
+class OutputBuffer : public OutputStream {
+protected:
+ char * _buff;
+ uint32_t _position;
+ uint32_t _capacity;
+public:
+ OutputBuffer()
+ : _buff(NULL), _position(0), _capacity(0) {
+ }
+
+ OutputBuffer(char * buff, uint32_t capacity)
+ : _buff(buff), _position(0), _capacity(capacity) {
+ }
+
+ virtual ~OutputBuffer() {
+ }
+
+ virtual uint64_t tell() {
+ return _position;
+ }
+
+ virtual void write(const void * buff, uint32_t length);
+
+ void clear() {
+ _position = 0;
+ }
+
+ void reset(char * buff, uint32_t capacity) {
+ _buff = buff;
+ _position = 0;
+ _capacity = capacity;
+ }
+
+ string getString() {
+ return string(_buff, _position);
+ }
+};
+
+class OutputStringStream : public OutputStream {
+protected:
+ string * _dest;
+public:
+ OutputStringStream()
+ : _dest(NULL) {
+ }
+
+ OutputStringStream(string & dest)
+ : _dest(&dest) {
+ }
+ virtual ~OutputStringStream() {
+ }
+
+ virtual uint64_t tell() {
+ return _dest->length();
+ }
+
+ virtual void write(const void * buff, uint32_t length) {
+ _dest->append((const char *)buff, length);
+ }
+
+ void reset(string * dest) {
+ _dest = dest;
+ }
+
+ void clear() {
+ _dest->clear();
+ }
+
+ string getString() {
+ return *_dest;
+ }
+};
+
+} // namespace NativeTask
+
+#endif /* BUFFERSTREAM_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/StringUtil.h"
+#include "util/WritableUtils.h"
+#include "Buffers.h"
+
+namespace NativeTask {
+
+DynamicBuffer::DynamicBuffer()
+ : _data(NULL), _capacity(0), _size(0), _used(0) {
+}
+
+DynamicBuffer::DynamicBuffer(uint32_t capacity)
+ : _data(NULL), _capacity(0), _size(0), _used(0) {
+ reserve(capacity);
+}
+
+DynamicBuffer::~DynamicBuffer() {
+ release();
+}
+
+void DynamicBuffer::release() {
+ if (_data != NULL) {
+ free(_data);
+ _data = NULL;
+ _capacity = 0;
+ _used = 0;
+ }
+}
+
+void DynamicBuffer::reserve(uint32_t capacity) {
+ if (_data != NULL) {
+ if (capacity > _capacity) {
+ char * newdata = (char*)realloc(_data, capacity);
+ if (newdata == NULL) {
+ THROW_EXCEPTION_EX(OutOfMemoryException, "DynamicBuffer reserve realloc %u failed",
+ capacity);
+ }
+ _data = newdata;
+ _capacity = capacity;
+ }
+ return;
+ }
+ release();
+ char * newdata = (char*)malloc(capacity);
+ if (newdata == NULL) {
+ THROW_EXCEPTION_EX(OutOfMemoryException, "DynamicBuffer reserve new %u failed", capacity);
+ }
+ _data = newdata;
+ _capacity = capacity;
+ _size = 0;
+ _used = 0;
+}
+
+int32_t DynamicBuffer::refill(InputStream * stream) {
+ if (_data == NULL || freeSpace() == 0) {
+ THROW_EXCEPTION(IOException, "refill DynamicBuffer failed, no space left");
+ }
+ int32_t rd = stream->read(_data + _size, freeSpace());
+ if (rd > 0) {
+ _size += rd;
+ }
+ return rd;
+}
+
+void DynamicBuffer::cleanUsed() {
+ if (_used > 0) {
+ uint32_t needToMove = _size - _used;
+ if (needToMove > 0) {
+ memmove(_data, _data + _used, needToMove);
+ _size = needToMove;
+ } else {
+ _size = 0;
+ }
+ _used = 0;
+ }
+}
+
+///////////////////////////////////////////////////////////
+
+ReadBuffer::ReadBuffer()
+ : _buff(NULL), _remain(0), _size(0), _capacity(0), _stream(NULL), _source(NULL) {
+}
+
+void ReadBuffer::init(uint32_t size, InputStream * stream, const string & codec) {
+ if (size < 1024) {
+ THROW_EXCEPTION_EX(UnsupportException, "ReadBuffer size %u not support.", size);
+ }
+ _buff = (char *)malloc(size);
+ if (NULL == _buff) {
+ THROW_EXCEPTION(OutOfMemoryException, "create append buffer");
+ }
+ _capacity = size;
+ _remain = 0;
+ _size = 0;
+ _stream = stream;
+ _source = _stream;
+ if (codec.length() > 0) {
+ if (!Compressions::support(codec)) {
+ THROW_EXCEPTION(UnsupportException, "compression codec not support");
+ }
+ _source = Compressions::getDecompressionStream(codec, _stream, size);
+ }
+}
+
+ReadBuffer::~ReadBuffer() {
+ if (_source != _stream) {
+ delete _source;
+ _source = NULL;
+ }
+ if (NULL != _buff) {
+ free(_buff);
+ _buff = NULL;
+ _capacity = 0;
+ _remain = 0;
+ _size = 0;
+ }
+}
+
+char * ReadBuffer::fillGet(uint32_t count) {
+
+ if (unlikely(count > _capacity)) {
+ uint32_t newcap = _capacity * 2 > count ? _capacity * 2 : count;
+ char * newbuff = (char*)malloc(newcap);
+
+
+ if (newbuff == NULL) {
+ THROW_EXCEPTION(OutOfMemoryException,
+ StringUtil::Format("buff realloc failed, size=%u", newcap));
+ }
+
+ if (_remain > 0) {
+ memcpy(newbuff, current(), _remain);
+ }
+ if (NULL != _buff) {
+ free(_buff);
+ }
+
+ _buff = newbuff;
+ _capacity = newcap;
+ } else {
+ if (_remain > 0) {
+ memcpy(_buff, current(), _remain);
+ }
+ }
+ _size = _remain;
+ while (_remain < count) {
+ int32_t rd = _source->read(_buff + _size, _capacity - _size);
+ if (rd <= 0) {
+ THROW_EXCEPTION(IOException, "read reach EOF");
+ }
+ _remain += rd;
+ _size += rd;
+ }
+ char * ret = current();
+ _remain -= count;
+ return ret;
+}
+
+int32_t ReadBuffer::fillRead(char * buff, uint32_t len) {
+ uint32_t cp = _remain;
+ if (cp > 0) {
+ memcpy(buff, current(), cp);
+ _remain = 0;
+ }
+ // TODO: read to buffer first
+ int32_t ret = _source->readFully(buff + cp, len - cp);
+ if (ret < 0 && cp == 0) {
+ return ret;
+ } else {
+ return ret < 0 ? cp : ret + cp;
+ }
+}
+
+int64_t ReadBuffer::fillReadVLong() {
+ if (_remain == 0) {
+ int32_t rd = _source->read(_buff, _capacity);
+ if (rd <= 0) {
+ THROW_EXCEPTION(IOException, "fillReadVLong reach EOF");
+ }
+ _remain = rd;
+ _size = rd;
+ }
+ int8_t * pos = (int8_t*)current();
+ if (*pos >= -112) {
+ _remain--;
+ return (int64_t)*pos;
+ }
+ bool neg = *pos < -120;
+ uint32_t len = neg ? (-119 - *pos) : (-111 - *pos);
+ pos = (int8_t*)get(len);
+ const int8_t * end = pos + len;
+ uint64_t value = 0;
+ while (++pos < end) {
+ value = (value << 8) | *(uint8_t*)pos;
+ }
+ return neg ? (value ^ -1LL) : value;
+}
+
+///////////////////////////////////////////////////////////
+
+AppendBuffer::AppendBuffer()
+ : _buff(NULL), _remain(0), _capacity(0), _counter(0), _stream(NULL), _dest(NULL),
+ _compression(false) {
+}
+
+void AppendBuffer::init(uint32_t size, OutputStream * stream, const string & codec) {
+ if (size < 1024) {
+ THROW_EXCEPTION_EX(UnsupportException, "AppendBuffer size %u not support.", size);
+ }
+ _buff = (char *)malloc(size + 8);
+ if (NULL == _buff) {
+ THROW_EXCEPTION(OutOfMemoryException, "create append buffer");
+ }
+ _capacity = size;
+ _remain = _capacity;
+ _stream = stream;
+ _dest = _stream;
+ if (codec.length() > 0) {
+ if (!Compressions::support(codec)) {
+ THROW_EXCEPTION(UnsupportException, "compression codec not support");
+ }
+ _dest = Compressions::getCompressionStream(codec, _stream, size);
+ _compression = true;
+ }
+}
+
+CompressStream * AppendBuffer::getCompressionStream() {
+ if (_compression) {
+ return (CompressStream *)_dest;
+ } else {
+ return NULL;
+ }
+}
+
+AppendBuffer::~AppendBuffer() {
+ if (_dest != _stream) {
+ delete _dest;
+ _dest = NULL;
+ }
+ if (NULL != _buff) {
+ free(_buff);
+ _buff = NULL;
+ _remain = 0;
+ _capacity = 0;
+ }
+}
+
+void AppendBuffer::flushd() {
+ _dest->write(_buff, _capacity - _remain);
+ _counter += _capacity - _remain;
+ _remain = _capacity;
+}
+
+void AppendBuffer::write_inner(const void * data, uint32_t len) {
+ flushd();
+ if (len >= _capacity / 2) {
+ _dest->write(data, len);
+ _counter += len;
+ } else {
+ simple_memcpy(_buff, data, len);
+ _remain -= len;
+ }
+}
+
+void AppendBuffer::write_vlong_inner(int64_t v) {
+ if (_remain < 9) {
+ flushd();
+ }
+ uint32_t len;
+ WritableUtils::WriteVLong(v, current(), len);
+ _remain -= len;
+}
+
+void AppendBuffer::write_vuint2_inner(uint32_t v1, uint32_t v2) {
+ if (_remain < 10) {
+ flushd();
+ }
+ uint32_t len;
+ WritableUtils::WriteVLong(v1, current(), len);
+ _remain -= len;
+ WritableUtils::WriteVLong(v2, current(), len);
+ _remain -= len;
+}
+
+} // namespace NativeTask
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,694 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef BUFFERS_H_
+#define BUFFERS_H_
+
+#include "Streams.h"
+#include "Compressions.h"
+#include "Constants.h"
+
+namespace NativeTask {
+
+class DynamicBuffer {
+protected:
+ char * _data;
+ uint32_t _capacity;
+ uint32_t _size;
+ uint32_t _used;
+public:
+ DynamicBuffer();
+
+ DynamicBuffer(uint32_t capacity);
+
+ ~DynamicBuffer();
+
+ void reserve(uint32_t capacity);
+
+ void release();
+
+ uint32_t capacity() {
+ return _capacity;
+ }
+
+ char * data() {
+ return _data;
+ }
+
+ uint32_t size() {
+ return _size;
+ }
+
+ uint32_t used() {
+ return _used;
+ }
+
+ char * current() {
+ return _data + _used;
+ }
+
+ char * end() {
+ return _data + _size;
+ }
+
+ uint32_t remain() {
+ return _size - _used;
+ }
+
+ uint32_t freeSpace() {
+ return _capacity - _size;
+ }
+
+ void use(uint32_t count) {
+ _used += count;
+ }
+
+ void cleanUsed();
+
+ int32_t refill(InputStream * stream);
+};
+
+/**
+ * A lightweight read buffer, act as buffered input stream
+ */
+class ReadBuffer {
+protected:
+ char * _buff;
+ uint32_t _remain;
+ uint32_t _size;
+ uint32_t _capacity;
+
+ InputStream * _stream;
+ InputStream * _source;
+
+protected:
+ inline char * current() {
+ return _buff + _size - _remain;
+ }
+
+ char * fillGet(uint32_t count);
+ int32_t fillRead(char * buff, uint32_t len);
+ int64_t fillReadVLong();
+public:
+ ReadBuffer();
+
+ void init(uint32_t size, InputStream * stream, const string & codec);
+
+ ~ReadBuffer();
+
+ /**
+ * use get() to get inplace continuous memory of small object
+ */
+ inline char * get(uint32_t count) {
+ if (likely(count <= _remain)) {
+ char * ret = current();
+ _remain -= count;
+ return ret;
+ }
+ return fillGet(count);
+ }
+
+ /**
+ * read to outside buffer
+ */
+ inline int32_t read(char * buff, uint32_t len) {
+ if (likely(len <= _remain)) {
+ memcpy(buff, current(), len);
+ _remain -= len;
+ return len;
+ }
+ return fillRead(buff, len);
+ }
+
+ /**
+ * read to outside buffer, use simple_memcpy
+ */
+ inline void readUnsafe(char * buff, uint32_t len) {
+ if (likely(len <= _remain)) {
+ simple_memcpy(buff, current(), len);
+ _remain -= len;
+ return;
+ }
+ fillRead(buff, len);
+ }
+
+ /**
+ * read VUInt
+ */
+ inline int64_t readVLong() {
+ if (likely(_remain > 0)) {
+ char * mark = current();
+ if (*(int8_t*)mark >= (int8_t)-112) {
+ _remain--;
+ return (int64_t)*mark;
+ }
+ }
+ return fillReadVLong();
+ }
+
+ /**
+ * read uint32_t little endian
+ */
+ inline uint32_t read_uint32_le() {
+ return *(uint32_t*)get(4);
+ }
+
+ /**
+ * read uint32_t big endian
+ */
+ inline uint32_t read_uint32_be() {
+ return bswap(read_uint32_le());
+ }
+};
+
+/**
+ * A light weighted append buffer, used as buffered output streams
+ */
+class AppendBuffer {
+protected:
+ char * _buff;
+ uint32_t _remain;
+ uint32_t _capacity;
+ uint64_t _counter;
+
+ OutputStream * _stream;
+ OutputStream * _dest;
+ bool _compression;
+
+protected:
+ void flushd();
+
+ inline char * current() {
+ return _buff + _capacity - _remain;
+ }
+
+ void write_inner(const void * data, uint32_t len);
+ void write_vlong_inner(int64_t v);
+ void write_vuint2_inner(uint32_t v1, uint32_t v2);
+public:
+ AppendBuffer();
+
+ ~AppendBuffer();
+
+ void init(uint32_t size, OutputStream * stream, const string & codec);
+
+ CompressStream * getCompressionStream();
+
+ uint64_t getCounter() {
+ return _counter;
+ }
+
+ inline char * borrowUnsafe(uint32_t len) {
+ if (likely(_remain >= len)) {
+ return current();
+ }
+ if (likely(_capacity >= len)) {
+ flushd();
+ return _buff;
+ }
+ return NULL;
+ }
+
+ inline void useUnsafe(uint32_t count) {
+ _remain -= count;
+ }
+
+ inline void write(char c) {
+ if (unlikely(_remain == 0)) {
+ flushd();
+ }
+ *current() = c;
+ _remain--;
+ }
+
+ inline void write(const void * data, uint32_t len) {
+ if (likely(len <= _remain)) { // append directly
+ simple_memcpy(current(), data, len);
+ _remain -= len;
+ return;
+ }
+ write_inner(data, len);
+ }
+
+ inline void write_uint32_le(uint32_t v) {
+ if (unlikely(4 > _remain)) {
+ flushd();
+ }
+ *(uint32_t*)current() = v;
+ _remain -= 4;
+ return;
+ }
+
+ inline void write_uint32_be(uint32_t v) {
+ write_uint32_le(bswap(v));
+ }
+
+ inline void write_uint64_le(uint64_t v) {
+ if (unlikely(8 > _remain)) {
+ flushd();
+ }
+ *(uint64_t*)current() = v;
+ _remain -= 8;
+ return;
+ }
+
+ inline void write_uint64_be(uint64_t v) {
+ write_uint64_le(bswap64(v));
+ }
+
+ inline void write_vlong(int64_t v) {
+ if (likely(_remain > 0 && v <= 127 && v >= -112)) {
+ *(char*)current() = (char)v;
+ _remain--;
+ return;
+ }
+ write_vlong_inner(v);
+ }
+
+ inline void write_vuint(uint32_t v) {
+ if (likely(_remain > 0 && v <= 127)) {
+ *(char*)current() = (char)v;
+ _remain--;
+ return;
+ }
+ write_vlong_inner(v);
+ }
+
+ inline void write_vuint2(uint32_t v1, uint32_t v2) {
+ if (likely(_remain >= 2 && v1 <= 127 && v2 <= 127)) {
+ *(char*)current() = (char)v1;
+ *(char*)(current() + 1) = (char)v2;
+ _remain -= 2;
+ return;
+ }
+ write_vuint2_inner(v1, v2);
+ }
+
+ /**
+ * flush current buffer, clear content
+ */
+ inline void flush() {
+ if (_remain < _capacity) {
+ flushd();
+ }
+ }
+};
+
+/**
+ * Memory Key-Value buffer pair with direct address content, so can be
+ * easily copied or dumped to file
+ */
+struct KVBuffer {
+ uint32_t keyLength;
+ uint32_t valueLength;
+ char content[1];
+
+ char * getKey() {
+ return content;
+ }
+
+ char * getValue() {
+ return content + keyLength;
+ }
+
+ KVBuffer * next() {
+ return ((KVBuffer*)(content + keyLength + valueLength));
+ }
+
+ std::string str() {
+ return std::string(content, keyLength) + "\t" + std::string(getValue(), valueLength);
+ }
+
+ uint32_t length() {
+ return keyLength + valueLength + SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH;
+ }
+
+ uint32_t lengthConvertEndium() {
+ long value = bswap64(*((long *)this));
+ return (value >> 32) + value + SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH;
+ }
+
+ void fill(const void * key, uint32_t keylen, const void * value, uint32_t vallen) {
+ keyLength = keylen;
+ valueLength = vallen;
+
+ if (keylen > 0) {
+ simple_memcpy(getKey(), key, keylen);
+ }
+ if (vallen > 0) {
+ simple_memcpy(getValue(), value, vallen);
+ }
+ }
+
+ static uint32_t headerLength() {
+ return SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH;
+ }
+};
+
+struct KVBufferWithParititionId {
+ uint32_t partitionId;
+ KVBuffer buffer;
+
+ inline static uint32_t minLength() {
+ return SIZE_OF_PARTITION_LENGTH + SIZE_OF_KV_LENGTH;
+ }
+
+ int length() {
+ return 4 + buffer.length();
+ }
+
+ int lengthConvertEndium() {
+ return 4 + buffer.lengthConvertEndium();
+ }
+};
+
+/**
+ * Native side abstraction of java ByteBuffer
+ */
+class ByteBuffer {
+private:
+ char * _buff;
+ uint32_t _limit;
+ uint32_t _position;
+ uint32_t _capacity;
+
+public:
+ ByteBuffer()
+ : _buff(NULL), _limit(0), _position(0), _capacity(0) {
+ }
+
+ ~ByteBuffer() {
+ }
+
+ void reset(char * buff, uint32_t inputCapacity) {
+ this->_buff = buff;
+ this->_capacity = inputCapacity;
+ this->_position = 0;
+ this->_limit = 0;
+ }
+
+ int capacity() {
+ return this->_capacity;
+ }
+
+ int remain() {
+ return _limit - _position;
+ }
+
+ int limit() {
+ return _limit;
+ }
+
+ int advance(int positionOffset) {
+ _position += positionOffset;
+ return _position;
+ }
+
+ int position() {
+ return this->_position;
+ }
+
+ void position(int newPos) {
+ this->_position = newPos;
+ }
+
+ void rewind(int newPos, int newLimit) {
+ this->_position = newPos;
+ if (newLimit > this->_capacity) {
+ THROW_EXCEPTION(IOException, "length larger than input buffer capacity");
+ }
+ this->_limit = newLimit;
+ }
+
+ char * current() {
+ return _buff + _position;
+ }
+
+ char * base() {
+ return _buff;
+ }
+};
+
+class ByteArray {
+private:
+ char * _buff;
+ uint32_t _length;
+ uint32_t _capacity;
+
+public:
+ ByteArray()
+ : _buff(NULL), _length(0), _capacity(0) {
+ }
+
+ ~ByteArray() {
+ if (NULL != _buff) {
+ delete[] _buff;
+ _buff = NULL;
+ }
+ _length = 0;
+ _capacity = 0;
+ }
+
+ void resize(uint32_t newSize) {
+ if (newSize <= _capacity) {
+ _length = newSize;
+ } else {
+ if (NULL != _buff) {
+ delete[] _buff;
+ _buff = NULL;
+ }
+ _capacity = 2 * newSize;
+ _buff = new char[_capacity];
+ _length = newSize;
+ }
+ }
+
+ char * buff() {
+ return _buff;
+ }
+
+ uint32_t size() {
+ return _length;
+ }
+};
+
+class FixSizeContainer {
+private:
+ char * _buff;
+ uint32_t _pos;
+ uint32_t _size;
+
+public:
+ FixSizeContainer()
+ : _buff(NULL), _pos(0), _size(0) {
+ }
+
+ ~FixSizeContainer() {
+ }
+
+ void wrap(char * buff, uint32_t size) {
+ _size = size;
+ _buff = buff;
+ _pos = 0;
+ }
+
+ void rewind() {
+ _pos = 0;
+ }
+
+ uint32_t remain() {
+ return _size - _pos;
+ }
+
+ char * current() {
+ return _buff + _pos;
+ }
+
+ char * base() {
+ return _buff;
+ }
+
+ uint32_t size() {
+ return _size;
+ }
+
+ /**
+ * return the length of actually filled data.
+ */
+ uint32_t fill(const char * source, uint32_t maxSize) {
+ int remain = _size - _pos;
+ if (remain <= 0) {
+ return 0;
+ }
+
+ uint32_t length = (maxSize < remain) ? maxSize : remain;
+ simple_memcpy(_buff + _pos, source, length);
+ _pos += length;
+ return length;
+ }
+
+ uint32_t position() {
+ return _pos;
+ }
+
+ void position(int pos) {
+ _pos = pos;
+ }
+};
+
+class ReadWriteBuffer {
+private:
+
+ static const uint32_t INITIAL_LENGTH = 16;
+
+ uint32_t _readPoint;
+ uint32_t _writePoint;
+ char * _buff;
+ uint32_t _buffLength;
+ bool _newCreatedBuff;
+
+public:
+
+ ReadWriteBuffer(uint32_t length)
+ : _readPoint(0), _writePoint(0), _buff(NULL), _buffLength(0), _newCreatedBuff(false) {
+ _buffLength = length;
+ if (_buffLength > 0) {
+ _buff = new char[_buffLength];
+ _newCreatedBuff = true;
+ }
+ }
+
+ ReadWriteBuffer()
+ : _readPoint(0), _writePoint(0), _buff(NULL), _buffLength(0), _newCreatedBuff(false) {
+ }
+
+ ~ReadWriteBuffer() {
+ if (_newCreatedBuff) {
+ delete[] _buff;
+ _buff = NULL;
+ }
+ }
+
+ void setReadPoint(uint32_t pos) {
+ _readPoint = pos;
+ }
+
+ void setWritePoint(uint32_t pos) {
+ _writePoint = pos;
+ }
+
+ char * getBuff() {
+ return _buff;
+ }
+
+ uint32_t getWritePoint() {
+ return _writePoint;
+ }
+
+ uint32_t getReadPoint() {
+ return _readPoint;
+ }
+
+ void writeInt(uint32_t param) {
+ uint32_t written = param;
+
+ checkWriteSpaceAndResizeIfNecessary(4);
+ *((uint32_t *)(_buff + _writePoint)) = written;
+ _writePoint += 4;
+ }
+
+ void writeLong(uint64_t param) {
+ uint64_t written = param;
+
+ checkWriteSpaceAndResizeIfNecessary(8);
+ *((uint64_t *)(_buff + _writePoint)) = written;
+ _writePoint += 8;
+ }
+
+ void writeString(const char * param, uint32_t length) {
+ writeInt(length);
+ checkWriteSpaceAndResizeIfNecessary(length);
+
+ memcpy(_buff + _writePoint, param, length);
+ _writePoint += length;
+ }
+
+ void writeString(std::string * param) {
+ const char * str = param->c_str();
+ int length = param->size();
+ writeString(str, length);
+ }
+
+ void writePointer(void * param) {
+ uint64_t written = (uint64_t)(param);
+ writeLong(written);
+ }
+
+ uint32_t readInt() {
+ char * readPos = _buff + _readPoint;
+ uint32_t result = *((uint32_t *)(readPos));
+ _readPoint += 4;
+ return result;
+ }
+
+ uint64_t readLong() {
+ char * readPos = _buff + _readPoint;
+ uint64_t result = *((uint64_t *)(readPos));
+ _readPoint += 8;
+ return result;
+ }
+
+ std::string * readString() {
+ uint32_t len = readInt();
+ char * strBegin = _buff + _readPoint;
+ _readPoint += len;
+ return new std::string(strBegin, len);
+ }
+
+ void * readPointer() {
+ uint64_t result = readLong();
+ return (void *)(result);
+ }
+
+private:
+ void checkWriteSpaceAndResizeIfNecessary(uint32_t toBeWritten) {
+ if (_buffLength == 0) {
+ _newCreatedBuff = true;
+ _buffLength = INITIAL_LENGTH > toBeWritten ? INITIAL_LENGTH : toBeWritten;
+ _buff = new char[_buffLength];
+ }
+
+ if (_buffLength - _writePoint >= toBeWritten) {
+ return;
+ }
+
+ _buffLength = _buffLength + toBeWritten;
+ _newCreatedBuff = true;
+ char * newBuff = new char[_buffLength];
+ memcpy(newBuff, _buff, _writePoint);
+ delete[] _buff;
+ _buff = newBuff;
+ }
+};
+
+typedef ReadWriteBuffer ParameterBuffer;
+typedef ReadWriteBuffer ResultBuffer;
+
+} // namespace NativeTask
+
+#endif /* BUFFERS_H_ */