You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/17 19:45:01 UTC

svn commit: r1611413 [9/18] - in /hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nativ...

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,163 @@
+// Copyright 2005 and onwards Google Inc.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//     * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+// A light-weight compression algorithm.  It is designed for speed of
+// compression and decompression, rather than for the utmost in space
+// savings.
+//
+// For getting better compression ratios when you are compressing data
+// with long repeated sequences or compressing data that is similar to
+// other data, while still compressing fast, you might look at first
+// using BMDiff and then compressing the output of BMDiff with
+// Snappy.
+
+#ifndef UTIL_SNAPPY_SNAPPY_H__
+#define UTIL_SNAPPY_SNAPPY_H__
+
+#include <stddef.h>
+#include <string>
+
+#include "snappy-stubs-public.h"
+
+namespace snappy {
+  class Source;
+  class Sink;
+
+  // ------------------------------------------------------------------------
+  // Generic compression/decompression routines.
+  // ------------------------------------------------------------------------
+
+  // Compress the bytes read from "*source" and append to "*sink". Return the
+  // number of bytes written.
+  size_t Compress(Source* source, Sink* sink);
+
+  // Find the uncompressed length of the given stream, as given by the header.
+  // Note that the true length could deviate from this; the stream could e.g.
+  // be truncated.
+  //
+  // Also note that this leaves "*source" in a state that is unsuitable for
+  // further operations, such as RawUncompress(). You will need to rewind
+  // or recreate the source yourself before attempting any further calls.
+  bool GetUncompressedLength(Source* source, uint32* result);
+
+  // ------------------------------------------------------------------------
+  // Higher-level string based routines (should be sufficient for most users)
+  // ------------------------------------------------------------------------
+
+  // Sets "*output" to the compressed version of "input[0,input_length-1]".
+  // Original contents of *output are lost.
+  //
+  // REQUIRES: "input[]" is not an alias of "*output".
+  size_t Compress(const char* input, size_t input_length, string* output);
+
+  // Decompresses "compressed[0,compressed_length-1]" to "*uncompressed".
+  // Original contents of "*uncompressed" are lost.
+  //
+  // REQUIRES: "compressed[]" is not an alias of "*uncompressed".
+  //
+  // returns false if the message is corrupted and could not be decompressed
+  bool Uncompress(const char* compressed, size_t compressed_length,
+                  string* uncompressed);
+
+
+  // ------------------------------------------------------------------------
+  // Lower-level character array based routines.  May be useful for
+  // efficiency reasons in certain circumstances.
+  // ------------------------------------------------------------------------
+
+  // REQUIRES: "compressed" must point to an area of memory that is at
+  // least "MaxCompressedLength(input_length)" bytes in length.
+  //
+  // Takes the data stored in "input[0..input_length]" and stores
+  // it in the array pointed to by "compressed".
+  //
+  // "*compressed_length" is set to the length of the compressed output.
+  //
+  // Example:
+  //    char* output = new char[snappy::MaxCompressedLength(input_length)];
+  //    size_t output_length;
+  //    RawCompress(input, input_length, output, &output_length);
+  //    ... Process(output, output_length) ...
+  //    delete [] output;
+  void RawCompress(const char* input,
+                   size_t input_length,
+                   char* compressed,
+                   size_t* compressed_length);
+
+  // Given data in "compressed[0..compressed_length-1]" generated by
+  // calling the Snappy::Compress routine, this routine
+  // stores the uncompressed data to
+  //    uncompressed[0..GetUncompressedLength(compressed)-1]
+  // returns false if the message is corrupted and could not be decrypted
+  bool RawUncompress(const char* compressed, size_t compressed_length,
+                     char* uncompressed);
+
+  // Given data from the byte source 'compressed' generated by calling
+  // the Snappy::Compress routine, this routine stores the uncompressed
+  // data to
+  //    uncompressed[0..GetUncompressedLength(compressed,compressed_length)-1]
+  // returns false if the message is corrupted and could not be decrypted
+  bool RawUncompress(Source* compressed, char* uncompressed);
+
+  // Returns the maximal size of the compressed representation of
+  // input data that is "source_bytes" bytes in length;
+  size_t MaxCompressedLength(size_t source_bytes);
+
+  // REQUIRES: "compressed[]" was produced by RawCompress() or Compress()
+  // Returns true and stores the length of the uncompressed data in
+  // *result normally.  Returns false on parsing error.
+  // This operation takes O(1) time.
+  bool GetUncompressedLength(const char* compressed, size_t compressed_length,
+                             size_t* result);
+
+  // Returns true iff the contents of "compressed[]" can be uncompressed
+  // successfully.  Does not return the uncompressed data.  Takes
+  // time proportional to compressed_length, but is usually at least
+  // a factor of four faster than actual decompression.
+  bool IsValidCompressedBuffer(const char* compressed,
+                               size_t compressed_length);
+
+  // The size of a compression block. Note that many parts of the compression
+  // code assumes that kBlockSize <= 65536; in particular, the hash table
+  // can only store 16-bit offsets, and EmitCopy() also assumes the offset
+  // is 65535 bytes or less. Note also that if you change this, it will
+  // affect the framing format (see framing_format.txt).
+  //
+  // Note that there might be older data around that is compressed with larger
+  // block sizes, so the decompression code should not rely on the
+  // non-existence of long backreferences.
+  static const int kBlockLog = 16;
+  static const size_t kBlockSize = 1 << kBlockLog;
+
+  static const int kMaxHashTableBits = 14;
+  static const size_t kMaxHashTableSize = 1 << kMaxHashTableBits;
+
+}  // end namespace snappy
+
+
+#endif  // UTIL_SNAPPY_SNAPPY_H__

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/StringUtil.h"
+#include "MCollectorOutputHandler.h"
+#include "NativeObjectFactory.h"
+#include "MapOutputCollector.h"
+#include "CombineHandler.h"
+
+using std::string;
+using std::vector;
+
+namespace NativeTask {
+
+const Command AbstractMapHandler::GET_OUTPUT_PATH(100, "GET_OUTPUT_PATH");
+const Command AbstractMapHandler::GET_OUTPUT_INDEX_PATH(101, "GET_OUTPUT_INDEX_PATH");
+const Command AbstractMapHandler::GET_SPILL_PATH(102, "GET_SPILL_PATH");
+const Command AbstractMapHandler::GET_COMBINE_HANDLER(103, "GET_COMBINE_HANDLER");
+}      //namespace

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ABSTRACT_MAP_HANDLER_H
+#define ABSTRACT_MAP_HANDLER_H
+
+#include "NativeTask.h"
+#include "BatchHandler.h"
+#include "lib/SpillOutputService.h"
+#include "lib/Combiner.h"
+#include "CombineHandler.h"
+
+namespace NativeTask {
+
+class AbstractMapHandler : public BatchHandler,  public SpillOutputService {
+public:
+  static const Command GET_OUTPUT_PATH;
+  static const Command GET_OUTPUT_INDEX_PATH;
+  static const Command GET_SPILL_PATH;
+  static const Command GET_COMBINE_HANDLER;
+
+public:
+  AbstractMapHandler() {}
+
+  virtual ~AbstractMapHandler() {}
+
+  virtual void configure(Config * config) {
+    _config = config;
+  }
+
+  virtual string * getOutputPath() {
+    ResultBuffer * outputPathResult = call(GET_OUTPUT_PATH, NULL);
+    if (NULL == outputPathResult) {
+      return NULL;
+    }
+    string * outputPath = outputPathResult->readString();
+
+    delete outputPathResult;
+    return outputPath;
+  }
+
+  virtual string * getOutputIndexPath() {
+
+    ResultBuffer * outputIndexPath = call(GET_OUTPUT_INDEX_PATH, NULL);
+    if (NULL == outputIndexPath) {
+      return NULL;
+    }
+    string * indexpath = outputIndexPath->readString();
+    delete outputIndexPath;
+    return indexpath;
+  }
+
+
+  virtual string * getSpillPath() {
+    ResultBuffer * spillPathBuffer = call(GET_SPILL_PATH, NULL);
+    if (NULL == spillPathBuffer) {
+      return NULL;
+    }
+    string * spillpath = spillPathBuffer->readString();
+    delete spillPathBuffer;
+    return spillpath;
+  }
+
+  virtual CombineHandler * getJavaCombineHandler() {
+
+    LOG("[MapOutputCollector::configure] java combiner is configured");
+
+    ResultBuffer * getCombineHandlerResult = call(GET_COMBINE_HANDLER, NULL);
+    if (NULL != getCombineHandlerResult) {
+
+      getCombineHandlerResult->setReadPoint(0);
+
+      CombineHandler * javaCombiner = (CombineHandler *)((BatchHandler * )(getCombineHandlerResult->readPointer()));
+      delete getCombineHandlerResult;
+      return javaCombiner;
+    }
+
+
+
+    return NULL;
+  }
+
+};
+
+} // namespace NativeTask
+
+#endif /* MMAPPERHANDLER_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef QUICK_BUILD
+#include "org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h"
+#endif
+#include "commons.h"
+#include "jni_md.h"
+#include "jniutils.h"
+#include "BatchHandler.h"
+#include "NativeObjectFactory.h"
+
+///////////////////////////////////////////////////////////////
+// NativeBatchProcessor jni util methods
+///////////////////////////////////////////////////////////////
+
+static jfieldID InputBufferFieldID = NULL;
+static jfieldID OutputBufferFieldID = NULL;
+static jmethodID FlushOutputMethodID = NULL;
+static jmethodID FinishOutputMethodID = NULL;
+static jmethodID SendCommandToJavaMethodID = NULL;
+
+///////////////////////////////////////////////////////////////
+// BatchHandler methods
+///////////////////////////////////////////////////////////////
+
+namespace NativeTask {
+
+ReadWriteBuffer * JNU_ByteArraytoReadWriteBuffer(JNIEnv * jenv, jbyteArray src) {
+  if (NULL == src) {
+    return NULL;
+  }
+  jsize len = jenv->GetArrayLength(src);
+
+  ReadWriteBuffer * ret = new ReadWriteBuffer(len);
+  jenv->GetByteArrayRegion(src, 0, len, (jbyte*)ret->getBuff());
+  ret->setWritePoint(len);
+  return ret;
+}
+
+jbyteArray JNU_ReadWriteBufferToByteArray(JNIEnv * jenv, ReadWriteBuffer * result) {
+  if (NULL == result || result->getWritePoint() == 0) {
+    return NULL;
+  }
+
+  jbyteArray ret = jenv->NewByteArray(result->getWritePoint());
+  jenv->SetByteArrayRegion(ret, 0, result->getWritePoint(), (jbyte*)result->getBuff());
+  return ret;
+}
+
+BatchHandler::BatchHandler()
+    : _processor(NULL), _config(NULL) {
+}
+
+BatchHandler::~BatchHandler() {
+  releaseProcessor();
+  if (NULL != _config) {
+    delete _config;
+    _config = NULL;
+  }
+}
+
+void BatchHandler::releaseProcessor() {
+  if (_processor != NULL) {
+    JNIEnv * env = JNU_GetJNIEnv();
+    env->DeleteGlobalRef((jobject)_processor);
+    _processor = NULL;
+  }
+}
+
+void BatchHandler::onInputData(uint32_t length) {
+  _in.rewind(0, length);
+  handleInput(_in);
+}
+
+void BatchHandler::flushOutput() {
+
+  if (NULL == _out.base()) {
+    return;
+  }
+
+  uint32_t length = _out.position();
+  _out.position(0);
+
+  if (length == 0) {
+    return;
+  }
+
+  JNIEnv * env = JNU_GetJNIEnv();
+  env->CallVoidMethod((jobject)_processor, FlushOutputMethodID, (jint)length);
+  if (env->ExceptionCheck()) {
+    THROW_EXCEPTION(JavaException, "FlushOutput throw exception");
+  }
+}
+
+void BatchHandler::finishOutput() {
+  if (NULL == _out.base()) {
+    return;
+  }
+  JNIEnv * env = JNU_GetJNIEnv();
+  env->CallVoidMethod((jobject)_processor, FinishOutputMethodID);
+  if (env->ExceptionCheck()) {
+    THROW_EXCEPTION(JavaException, "FinishOutput throw exception");
+  }
+}
+
+void BatchHandler::onSetup(Config * config, char * inputBuffer, uint32_t inputBufferCapacity,
+    char * outputBuffer, uint32_t outputBufferCapacity) {
+  this->_config = config;
+  _in.reset(inputBuffer, inputBufferCapacity);
+  if (NULL != outputBuffer) {
+    if (outputBufferCapacity <= 1024) {
+      THROW_EXCEPTION(IOException, "Output buffer size too small for BatchHandler");
+    }
+    _out.reset(outputBuffer, outputBufferCapacity);
+    _out.rewind(0, outputBufferCapacity);
+
+    LOG("[BatchHandler::onSetup] input Capacity %d, output capacity %d", inputBufferCapacity, _out.limit());
+  }
+  configure(_config);
+}
+
+ResultBuffer * BatchHandler::call(const Command& cmd, ParameterBuffer * param) {
+  JNIEnv * env = JNU_GetJNIEnv();
+  jbyteArray jcmdData = JNU_ReadWriteBufferToByteArray(env, param);
+  jbyteArray ret = (jbyteArray)env->CallObjectMethod((jobject)_processor, SendCommandToJavaMethodID,
+      cmd.id(), jcmdData);
+
+
+  if (env->ExceptionCheck()) {
+    THROW_EXCEPTION(JavaException, "SendCommandToJava throw exception");
+  }
+  return JNU_ByteArraytoReadWriteBuffer(env, ret);
+}
+
+} // namespace NativeTask
+
+///////////////////////////////////////////////////////////////
+// NativeBatchProcessor jni methods
+///////////////////////////////////////////////////////////////
+using namespace NativeTask;
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method:    setupHandler
+ * Signature: (J)V
+ */
+void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_setupHandler(
+    JNIEnv * jenv, jobject processor, jlong handler, jobjectArray configs) {
+  try {
+
+    NativeTask::Config * config = new NativeTask::Config();
+    jsize len = jenv->GetArrayLength(configs);
+    for (jsize i = 0; i + 1 < len; i += 2) {
+      jbyteArray key_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i);
+      jbyteArray val_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i + 1);
+      config->set(JNU_ByteArrayToString(jenv, key_obj), JNU_ByteArrayToString(jenv, val_obj));
+    }
+
+    NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
+    if (NULL == batchHandler) {
+      JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException", "BatchHandler is null");
+      return;
+    }
+    jobject jinputBuffer = jenv->GetObjectField(processor, InputBufferFieldID);
+    char * inputBufferAddr = NULL;
+    uint32_t inputBufferCapacity = 0;
+    if (NULL != jinputBuffer) {
+      inputBufferAddr = (char*)(jenv->GetDirectBufferAddress(jinputBuffer));
+      inputBufferCapacity = jenv->GetDirectBufferCapacity(jinputBuffer);
+    }
+    jobject joutputBuffer = jenv->GetObjectField(processor, OutputBufferFieldID);
+    char * outputBufferAddr = NULL;
+    uint32_t outputBufferCapacity = 0;
+    if (NULL != joutputBuffer) {
+      outputBufferAddr = (char*)(jenv->GetDirectBufferAddress(joutputBuffer));
+      outputBufferCapacity = jenv->GetDirectBufferCapacity(joutputBuffer);
+    }
+    batchHandler->setProcessor(jenv->NewGlobalRef(processor));
+    batchHandler->onSetup(config, inputBufferAddr, inputBufferCapacity, outputBufferAddr,
+        outputBufferCapacity);
+  } catch (NativeTask::UnsupportException & e) {
+    JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+  } catch (NativeTask::OutOfMemoryException & e) {
+    JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+  } catch (NativeTask::IOException & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (NativeTask::JavaException & e) {
+    LOG("JavaException: %s", e.what());
+    // Do nothing, let java side handle
+  } catch (std::exception & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (...) {
+    JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+  }
+}
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method:    nativeProcessInput
+ * Signature: (JI)V
+ */
+void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeProcessInput(
+    JNIEnv * jenv, jobject processor, jlong handler, jint length) {
+
+  try {
+    NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
+    if (NULL == batchHandler) {
+      JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
+          "handler not instance of BatchHandler");
+      return;
+    }
+    batchHandler->onInputData(length);
+  } catch (NativeTask::UnsupportException & e) {
+    JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+  } catch (NativeTask::OutOfMemoryException & e) {
+    JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+  } catch (NativeTask::IOException & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (NativeTask::JavaException & e) {
+    LOG("JavaException: %s", e.what());
+    // Do nothing, let java side handle
+  } catch (std::exception & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (...) {
+    JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+  }
+}
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method:    nativeFinish
+ * Signature: (J)V
+ */
+void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeFinish(
+    JNIEnv * jenv, jobject processor, jlong handler) {
+  try {
+    NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
+    if (NULL == batchHandler) {
+      JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
+          "handler not instance of BatchHandler");
+      return;
+    }
+    batchHandler->onFinish();
+  } catch (NativeTask::UnsupportException & e) {
+    JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+  } catch (NativeTask::OutOfMemoryException & e) {
+    JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+  } catch (NativeTask::IOException & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (NativeTask::JavaException & e) {
+    LOG("JavaException: %s", e.what());
+    // Do nothing, let java side handle
+  } catch (std::exception & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (...) {
+    JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+  }
+}
+
+void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeLoadData(
+    JNIEnv * jenv, jobject processor, jlong handler) {
+  try {
+    NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
+    if (NULL == batchHandler) {
+      JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
+          "handler not instance of BatchHandler");
+      return;
+    }
+    batchHandler->onLoadData();
+  } catch (NativeTask::UnsupportException & e) {
+    JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+  } catch (NativeTask::OutOfMemoryException & e) {
+    JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+  } catch (NativeTask::IOException & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (NativeTask::JavaException & e) {
+    LOG("JavaException: %s", e.what());
+    // Do nothing, let java side handle
+  } catch (std::exception & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (...) {
+    JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+  }
+}
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method:    nativeCommand
+ * Signature: (J[B)[B
+ */
+jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeCommand(
+    JNIEnv * jenv, jobject processor, jlong handler, jint command, jbyteArray cmdData) {
+  try {
+    NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
+    if (NULL == batchHandler) {
+      JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
+          "handler not instance of BatchHandler");
+      return NULL;
+    }
+    Command cmd(command);
+    ParameterBuffer * param = JNU_ByteArraytoReadWriteBuffer(jenv, cmdData);
+    ResultBuffer * result = batchHandler->onCall(cmd, param);
+    jbyteArray ret = JNU_ReadWriteBufferToByteArray(jenv, result);
+
+    delete result;
+    delete param;
+    return ret;
+  } catch (NativeTask::UnsupportException & e) {
+    JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+  } catch (NativeTask::OutOfMemoryException & e) {
+    JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+  } catch (NativeTask::IOException & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (const NativeTask::JavaException & e) {
+    LOG("JavaException: %s", e.what());
+    // Do nothing, let java side handle
+  } catch (std::exception & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (...) {
+    JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+  }
+  return NULL;
+}
+
+/*
+ * Class:     org_apace_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method:    InitIDs
+ * Signature: ()V
+ */
+void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_InitIDs(JNIEnv * jenv,
+    jclass processorClass) {
+  InputBufferFieldID = jenv->GetFieldID(processorClass, "rawOutputBuffer", "Ljava/nio/ByteBuffer;");
+  OutputBufferFieldID = jenv->GetFieldID(processorClass, "rawInputBuffer", "Ljava/nio/ByteBuffer;");
+  FlushOutputMethodID = jenv->GetMethodID(processorClass, "flushOutput", "(I)V");
+  FinishOutputMethodID = jenv->GetMethodID(processorClass, "finishOutput", "()V");
+  SendCommandToJavaMethodID = jenv->GetMethodID(processorClass, "sendCommandToJava", "(I[B)[B");
+}
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef BATCHHANDLER_H_
+#define BATCHHANDLER_H_
+
+#include "NativeTask.h"
+#include "lib/Buffers.h"
+
+namespace NativeTask {
+
+/**
+ * Native side counterpart of java side NativeBatchProcessor
+ */
+class BatchHandler : public Configurable {
+protected:
+  ByteBuffer _in;
+  ByteBuffer _out;
+  void * _processor;
+  Config * _config;
+public:
+  BatchHandler();
+  virtual ~BatchHandler();
+
+  virtual NativeObjectType type() {
+    return BatchHandlerType;
+  }
+
+  /**
+   * Called by native jni functions to set global jni reference
+   */
+  void setProcessor(void * processor) {
+    _processor = processor;
+  }
+
+  void releaseProcessor();
+
+  /**
+   * Called by java side to setup native side BatchHandler
+   * initialize buffers by default
+   */
+  void onSetup(Config * config, char * inputBuffer, uint32_t inputBufferCapacity,
+      char * outputBuffer, uint32_t outputBufferCapacity);
+
+  /**
+   * Called by java side to notice that input data available to handle
+   * @param length input buffer's available data length
+   */
+  void onInputData(uint32_t length);
+
+  virtual void onLoadData() {
+  }
+
+  /**
+   * Called by java side to notice that input has finished
+   */
+  void onFinish() {
+    finish();
+  }
+
+  /**
+   * Called by java side to send command to this handler
+   * BatchHandler ignore all command by default
+   * @param cmd command data
+   * @return command return value
+   */
+  virtual ResultBuffer * onCall(const Command& command, ReadWriteBuffer * param) {
+    return NULL;
+  }
+
+protected:
+  virtual ResultBuffer * call(const Command& cmd, ParameterBuffer * param);
+
+  /**
+   * Used by subclass, call java side flushOutput(int length)
+   * @param length output buffer's available data length
+   */
+  virtual void flushOutput();
+
+  /**
+   * Used by subclass, call java side finishOutput()
+   */
+  void finishOutput();
+
+  /**
+   * Write output buffer and use flushOutput manually,
+   * or use this helper method
+   */
+  inline void output(const char * buff, uint32_t length) {
+    while (length > 0) {
+      if (length > _out.remain()) {
+        flushOutput();
+      }
+      uint32_t remain = _out.remain();
+      uint32_t cp = length < remain ? length : remain;
+      simple_memcpy(_out.current(), buff, cp);
+      buff += cp;
+      length -= cp;
+      _out.advance(cp);
+    }
+  }
+
+  inline void outputInt(uint32_t v) {
+    if (4 > _out.remain()) {
+      flushOutput();
+    }
+    *(uint32_t*)(_out.current()) = v;
+    _out.advance(4);
+  }
+
+  /////////////////////////////////////////////////////////////
+  // Subclass should implement these if needed
+  /////////////////////////////////////////////////////////////
+
+  /**
+   * Called by onSetup, do nothing by default
+   * Subclass should override this if needed
+   */
+  virtual void configure(Config * config) {
+  }
+
+  /**
+   * Called by onFinish, flush & close output by default
+   * Subclass should override this if needed
+   */
+  virtual void finish() {
+    flushOutput();
+    finishOutput();
+  }
+  ;
+
+  /**
+   * Called by onInputData, internal input data processor,
+   * Subclass should override this if needed
+   */
+  virtual void handleInput(ByteBuffer & byteBuffer) {
+  }
+};
+
+} // namespace NativeTask
+
+#endif /* BATCHHANDLER_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "CombineHandler.h"
+
+namespace NativeTask {
+const char * REFILL = "refill";
+const int LENGTH_OF_REFILL_STRING = 6;
+
+const Command CombineHandler::COMBINE(4, "Combine");
+
+CombineHandler::CombineHandler()
+    : _combineContext(NULL), _kvIterator(NULL), _writer(NULL), _config(NULL), _kvCached(false),
+      _kType(UnknownType), _vType(UnknownType), _combineInputRecordCount(0), _combineInputBytes(0),
+      _combineOutputRecordCount(0),_combineOutputBytes(0){
+}
+
+CombineHandler::~CombineHandler() {
+}
+
+void CombineHandler::configure(Config * config) {
+
+  _config = config;
+  MapOutputSpec::getSpecFromConfig(_config, _mapOutputSpec);
+  _kType = _mapOutputSpec.keyType;
+  _vType = _mapOutputSpec.valueType;
+}
+
+uint32_t CombineHandler::feedDataToJavaInWritableSerialization() {
+
+  uint32_t written = 0;
+  bool firstKV = true;
+  _out.position(0);
+
+  if (_kvCached) {
+    uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength();
+    outputInt(bswap(_key.outerLength));
+    outputInt(bswap(_value.outerLength));
+    outputKeyOrValue(_key, _kType);
+    outputKeyOrValue(_value, _vType);
+
+    written += kvLength;
+    _kvCached = false;
+    firstKV = false;
+  }
+
+  uint32_t recordCount = 0;
+  while (nextKeyValue(_key, _value)) {
+
+    //::sleep(5);
+    _kvCached = false;
+    recordCount++;
+
+    uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength();
+
+    if (!firstKV && kvLength > _out.remain()) {
+      _kvCached = true;
+      break;
+    } else {
+      firstKV = false;
+      //write final key length and final value length
+      outputInt(bswap(_key.outerLength));
+      outputInt(bswap(_value.outerLength));
+      outputKeyOrValue(_key, _kType);
+      outputKeyOrValue(_value, _vType);
+
+      written += kvLength;
+    }
+  }
+
+  if (_out.position() > 0) {
+    flushOutput();
+  }
+
+  _combineInputRecordCount += recordCount;
+  _combineInputBytes += written;
+  return written;
+}
+
+/**
+ * KV: key or value
+ */
+void CombineHandler::outputKeyOrValue(SerializeInfo & KV, KeyValueType type) {
+  uint32_t length = 0;
+  switch (type) {
+  case TextType:
+    output(KV.varBytes, KV.outerLength - KV.buffer.length());
+    output(KV.buffer.data(), KV.buffer.length());
+    break;
+  case BytesType:
+    outputInt(bswap(KV.buffer.length()));
+    output(KV.buffer.data(), KV.buffer.length());
+    break;
+  default:
+    output(KV.buffer.data(), KV.buffer.length());
+    break;
+  }
+}
+
+bool CombineHandler::nextKeyValue(SerializeInfo & key, SerializeInfo & value) {
+
+  if (!_kvIterator->next(key.buffer, value.buffer)) {
+    return false;
+  }
+
+  uint32_t varLength = 0;
+  switch (_kType) {
+  case TextType:
+    WritableUtils::WriteVInt(key.buffer.length(), key.varBytes, varLength);
+    key.outerLength = key.buffer.length() + varLength;
+    break;
+  case BytesType:
+    key.outerLength = key.buffer.length() + 4;
+    break;
+  default:
+    key.outerLength = key.buffer.length();
+    break;
+  }
+
+  //prepare final value length
+  uint32_t varValueLength = 0;
+  switch (_vType) {
+  case TextType:
+    WritableUtils::WriteVInt(value.buffer.length(), value.varBytes, varValueLength);
+    value.outerLength = value.buffer.length() + varValueLength;
+    break;
+  case BytesType:
+    value.outerLength = value.buffer.length() + 4;
+    break;
+  default:
+    value.outerLength = value.buffer.length();
+    break;
+  }
+
+  return true;
+}
+
+uint32_t CombineHandler::feedDataToJava(SerializationFramework serializationType) {
+  if (serializationType == WRITABLE_SERIALIZATION) {
+    return feedDataToJavaInWritableSerialization();
+  }
+  THROW_EXCEPTION(IOException, "Native Serialization not supported");
+}
+
+void CombineHandler::handleInput(ByteBuffer & in) {
+  char * buff = in.current();
+  uint32_t length = in.remain();
+  const char * end = buff + length;
+  uint32_t remain = length;
+  char * pos = buff;
+  if (_asideBuffer.remain() > 0) {
+    uint32_t filledLength = _asideBuffer.fill(pos, length);
+    pos += filledLength;
+    remain -= filledLength;
+  }
+
+  if (_asideBuffer.size() > 0 && _asideBuffer.remain() == 0) {
+    _asideBuffer.position(0);
+    write(_asideBuffer.current(), _asideBuffer.size());
+    _asideBuffer.wrap(NULL, 0);
+  }
+
+  if (remain == 0) {
+    return;
+  }
+  KVBuffer * kvBuffer = (KVBuffer *)pos;
+
+  if (unlikely(remain < kvBuffer->headerLength())) {
+    THROW_EXCEPTION(IOException, "k/v meta information incomplete");
+  }
+
+  int kvLength = kvBuffer->lengthConvertEndium();
+
+  if (kvLength > remain) {
+    _asideBytes.resize(kvLength);
+    _asideBuffer.wrap(_asideBytes.buff(), _asideBytes.size());
+    _asideBuffer.fill(pos, remain);
+    pos += remain;
+    remain = 0;
+  } else {
+    write(pos, remain);
+  }
+}
+
+void CombineHandler::write(char * buf, uint32_t length) {
+  KVBuffer * kv = NULL;
+  char * pos = buf;
+  uint32_t remain = length;
+
+  uint32_t outputRecordCount = 0;
+  while (remain > 0) {
+    kv = (KVBuffer *)pos;
+    kv->keyLength = bswap(kv->keyLength);
+    kv->valueLength = bswap(kv->valueLength);
+    _writer->write(kv->getKey(), kv->keyLength, kv->getValue(), kv->valueLength);
+    outputRecordCount++;
+    remain -= kv->length();
+    pos += kv->length();
+    ;
+  }
+
+  _combineOutputRecordCount += outputRecordCount;
+  _combineOutputBytes += length;
+}
+
+string toString(uint32_t length) {
+  string result;
+  result.reserve(4);
+  result.assign((char *)(&length), 4);
+  return result;
+}
+
+void CombineHandler::onLoadData() {
+  feedDataToJava(WRITABLE_SERIALIZATION);
+}
+
+ResultBuffer * CombineHandler::onCall(const Command& command, ParameterBuffer * param) {
+  THROW_EXCEPTION(UnsupportException, "Command not supported by RReducerHandler");
+}
+
+void CombineHandler::combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer) {
+
+  _combineInputRecordCount = 0;
+  _combineOutputRecordCount = 0;
+  _combineInputBytes = 0;
+  _combineOutputBytes = 0;
+
+  this->_combineContext = &type;
+  this->_kvIterator = kvIterator;
+  this->_writer = writer;
+  call(COMBINE, NULL);
+
+  LOG("[CombineHandler] input Record Count: %d, input Bytes: %d, output Record Count: %d, output Bytes: %d",
+      _combineInputRecordCount, _combineInputBytes,
+      _combineOutputRecordCount, _combineOutputBytes);
+  return;
+}
+
+void CombineHandler::finish() {
+}
+
+} /* namespace NativeTask */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _COMBINEHANDLER_H_
+#define _COMBINEHANDLER_H_
+
+#include "Combiner.h"
+#include "BatchHandler.h"
+
+namespace NativeTask {
+
+enum SerializationFramework {
+  WRITABLE_SERIALIZATION = 0,
+  NATIVE_SERIALIZATION = 1
+};
+
+struct SerializeInfo {
+  Buffer buffer;
+  uint32_t outerLength;
+  char varBytes[8];
+};
+
+class CombineHandler : public NativeTask::ICombineRunner, public NativeTask::BatchHandler {
+public:
+  static const Command COMBINE;
+
+private:
+
+  CombineContext * _combineContext;
+  KVIterator * _kvIterator;
+  IFileWriter * _writer;
+  SerializeInfo _key;
+  SerializeInfo _value;
+
+  KeyValueType _kType;
+  KeyValueType _vType;
+  MapOutputSpec _mapOutputSpec;
+  Config * _config;
+  bool _kvCached;
+
+  uint32_t _combineInputRecordCount;
+  uint32_t _combineInputBytes;
+
+  uint32_t _combineOutputRecordCount;
+  uint32_t _combineOutputBytes;
+
+  FixSizeContainer _asideBuffer;
+  ByteArray _asideBytes;
+
+public:
+  CombineHandler();
+  virtual ~CombineHandler();
+
+  virtual void handleInput(ByteBuffer & byteBuffer);
+  void finish();
+
+  ResultBuffer * onCall(const Command& command, ParameterBuffer * param);
+
+  void configure(Config * config);
+
+  void combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer);
+
+  virtual void onLoadData();
+
+private:
+  void flushDataToWriter();
+  void outputKeyOrValue(SerializeInfo & info, KeyValueType type);
+  bool nextKeyValue(SerializeInfo & key, SerializeInfo & value);
+  uint32_t feedDataToJava(SerializationFramework serializationType);
+  uint32_t feedDataToJavaInWritableSerialization();
+  void write(char * buf, uint32_t length);
+
+};
+
+} /* namespace NativeTask */
+#endif /* _JAVACOMBINEHANDLER_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/StringUtil.h"
+#include "MCollectorOutputHandler.h"
+#include "NativeObjectFactory.h"
+#include "MapOutputCollector.h"
+#include "CombineHandler.h"
+
+using std::string;
+using std::vector;
+
+namespace NativeTask {
+
+MCollectorOutputHandler::MCollectorOutputHandler()
+    : _collector(NULL), _dest(NULL), _endium(LARGE_ENDIUM) {
+}
+
+MCollectorOutputHandler::~MCollectorOutputHandler() {
+  _dest = NULL;
+  delete _collector;
+  _collector = NULL;
+}
+
+void MCollectorOutputHandler::configure(Config * config) {
+  if (NULL == config) {
+    return;
+  }
+
+  uint32_t partition = config->getInt(MAPRED_NUM_REDUCES, 1);
+
+  _collector = new MapOutputCollector(partition, this);
+  _collector->configure(config);
+}
+
+void MCollectorOutputHandler::finish() {
+  _collector->close();
+  BatchHandler::finish();
+}
+
+void MCollectorOutputHandler::handleInput(ByteBuffer & in) {
+  char * buff = in.current();
+  uint32_t length = in.remain();
+
+  const char * end = buff + length;
+  char * pos = buff;
+  if (_kvContainer.remain() > 0) {
+    uint32_t filledLength = _kvContainer.fill(pos, length);
+    pos += filledLength;
+  }
+
+  while (end - pos > 0) {
+    KVBufferWithParititionId * kvBuffer = (KVBufferWithParititionId *)pos;
+
+    if (unlikely(end - pos < KVBuffer::headerLength())) {
+      THROW_EXCEPTION(IOException, "k/v meta information incomplete");
+    }
+
+    if (_endium == LARGE_ENDIUM) {
+      kvBuffer->partitionId = bswap(kvBuffer->partitionId);
+      kvBuffer->buffer.keyLength = bswap(kvBuffer->buffer.keyLength);
+      kvBuffer->buffer.valueLength = bswap(kvBuffer->buffer.valueLength);
+    }
+
+    uint32_t kvLength = kvBuffer->buffer.length();
+
+    KVBuffer * dest = allocateKVBuffer(kvBuffer->partitionId, kvLength);
+    _kvContainer.wrap((char *)dest, kvLength);
+
+    pos += 4; //skip the partition length
+    uint32_t filledLength = _kvContainer.fill(pos, end - pos);
+    pos += filledLength;
+  }
+}
+
+KVBuffer * MCollectorOutputHandler::allocateKVBuffer(uint32_t partitionId, uint32_t kvlength) {
+  KVBuffer * dest = _collector->allocateKVBuffer(partitionId, kvlength);
+  return dest;
+}
+
+}      //namespace

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MCOLLECTOROUTPUTHANDLER_H_
+#define MCOLLECTOROUTPUTHANDLER_H_
+
+#include "BatchHandler.h"
+#include "lib/SpillOutputService.h"
+#include "AbstractMapHandler.h"
+
+namespace NativeTask {
+class MapOutputCollector;
+
+class MCollectorOutputHandler : public AbstractMapHandler {
+private:
+
+  FixSizeContainer _kvContainer;
+
+  MapOutputCollector * _collector;
+  // state info for large KV pairs
+  char * _dest;
+
+  Endium _endium;
+
+public:
+  MCollectorOutputHandler();
+  virtual ~MCollectorOutputHandler();
+
+  virtual void configure(Config * config);
+  virtual void finish();
+  virtual void handleInput(ByteBuffer & byteBuffer);
+private:
+  KVBuffer * allocateKVBuffer(uint32_t partition, uint32_t kvlength);
+};
+
+}
+
+#endif /* MCOLLECTOROUTPUTHANDLER_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,54 @@
+/* DO NOT EDIT THIS FILE - it is machine generated */
+#include <jni.h>
+/* Header for class org_apache_hadoop_mapred_nativetask_NativeBatchProcessor */
+
+#ifndef _Included_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+#define _Included_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+#ifdef __cplusplus
+extern "C" {
+#endif
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method:    setupHandler
+ * Signature: (J)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_setupHandler(
+    JNIEnv *, jobject, jlong, jobjectArray configs);
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method:    nativeProcessInput
+ * Signature: (JI)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeProcessInput(
+    JNIEnv *, jobject, jlong, jint);
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method:    nativeFinish
+ * Signature: (J)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeFinish(
+    JNIEnv *, jobject, jlong);
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method:    nativeCommand
+ * Signature: (J[B)[B
+ */JNIEXPORT jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeCommand(
+    JNIEnv *, jobject, jlong, jint, jbyteArray);
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method:    nativeLoadData
+ * Signature: (J)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeLoadData(
+    JNIEnv *, jobject, jlong);
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
+ * Method:    InitIDs
+ * Signature: ()V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_InitIDs(
+    JNIEnv *, jclass);
+
+#ifdef __cplusplus
+}
+#endif
+#endif

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "BufferStream.h"
+
+namespace NativeTask {
+
+BufferedInputStream::BufferedInputStream(InputStream * stream, uint32_t bufferSize)
+    : FilterInputStream(stream), _buff(NULL), _position(0), _limit(0), _capacity(0) {
+  _buff = (char*)malloc(bufferSize);
+  if (NULL != _buff) {
+    LOG("[BuferStream] malloc failed when create BufferedInputStream with buffersize %u",
+        bufferSize);
+    _capacity = bufferSize;
+  }
+}
+
+BufferedInputStream::~BufferedInputStream() {
+  if (NULL != _buff) {
+    free(_buff);
+    _buff = NULL;
+    _position = 0;
+    _limit = 0;
+    _capacity = 0;
+  }
+}
+
+void BufferedInputStream::seek(uint64_t position) {
+  if (_limit - _position > 0) {
+    THROW_EXCEPTION(IOException, "temporary buffered data exists when fseek()");
+  }
+  _stream->seek(position);
+}
+
+uint64_t BufferedInputStream::tell() {
+  return _stream->tell() - (_limit - _position);
+}
+
+int32_t BufferedInputStream::read(void * buff, uint32_t length) {
+  uint32_t rest = _limit - _position;
+  if (rest > 0) {
+    // have some data in buffer, read from buffer
+    uint32_t cp = rest < length ? rest : length;
+    memcpy(buff, _buff + _position, cp);
+    _position += cp;
+    return cp;
+  } else if (length >= _capacity / 2) {
+    // dest buffer big enough, read to dest buffer directly
+    return _stream->read(buff, length);
+  } else {
+    // read to buffer first, then copy part of it to dest
+    _limit = 0;
+    do {
+      int32_t rd = _stream->read(_buff + _limit, _capacity - _limit);
+      if (rd <= 0) {
+        break;
+      }
+    } while (_limit < _capacity / 2);
+    if (_limit == 0) {
+      return -1;
+    }
+    uint32_t cp = _limit < length ? _limit : length;
+    memcpy(buff, _buff, cp);
+    _position = cp;
+    return cp;
+  }
+}
+
+/////////////////////////////////////////////////////////////////
+
+BufferedOutputStream::BufferedOutputStream(InputStream * stream, uint32_t bufferSize)
+    : FilterOutputStream(_stream), _buff(NULL), _position(0), _capacity(0) {
+  _buff = (char*)malloc(bufferSize + sizeof(uint64_t));
+  if (NULL != _buff) {
+    LOG("[BuferStream] malloc failed when create BufferedOutputStream with buffersize %u",
+        bufferSize);
+    _capacity = bufferSize;
+  }
+}
+
+BufferedOutputStream::~BufferedOutputStream() {
+  if (NULL != _buff) {
+    free(_buff);
+    _buff = NULL;
+    _position = 0;
+    _capacity = 0;
+  }
+}
+
+uint64_t BufferedOutputStream::tell() {
+  return _stream->tell() + _position;
+}
+
+void BufferedOutputStream::write(const void * buff, uint32_t length) {
+  if (length < _capacity / 2) {
+    uint32_t rest = _capacity - _position;
+    if (length < rest) {
+      simple_memcpy(_buff + _position, buff, length);
+      _position += length;
+    } else {
+      flush();
+      simple_memcpy(_buff, buff, length);
+      _position = length;
+    }
+  } else {
+    flush();
+    _stream->write(buff, length);
+  }
+}
+
+void BufferedOutputStream::flush() {
+  if (_position > 0) {
+    _stream->write(_buff, _position);
+    _position = 0;
+  }
+}
+
+///////////////////////////////////////////////////////////
+
+int32_t InputBuffer::read(void * buff, uint32_t length) {
+  uint32_t rd = _capacity - _position < length ? _capacity - _position : length;
+  if (rd > 0) {
+    memcpy(buff, _buff + _position, rd);
+    _position += rd;
+    return rd;
+  }
+  return length == 0 ? 0 : -1;
+}
+
+void OutputBuffer::write(const void * buff, uint32_t length) {
+  if (_position + length <= _capacity) {
+    memcpy(_buff + _position, buff, length);
+    _position += length;
+  } else {
+    THROW_EXCEPTION(IOException, "OutputBuffer too small to write");
+  }
+}
+
+} // namespace NativeTask

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef BUFFERSTREAM_H_
+#define BUFFERSTREAM_H_
+
+#include <string>
+#include "Streams.h"
+
+namespace NativeTask {
+
+using std::string;
+
+class BufferedInputStream : public FilterInputStream {
+protected:
+  char * _buff;
+  uint32_t _position;
+  uint32_t _limit;
+  uint32_t _capacity;
+public:
+  BufferedInputStream(InputStream * stream, uint32_t bufferSize = 64 * 1024);
+
+  virtual ~BufferedInputStream();
+
+  virtual void seek(uint64_t position);
+
+  virtual uint64_t tell();
+
+  virtual int32_t read(void * buff, uint32_t length);
+};
+
+class BufferedOutputStream : public FilterOutputStream {
+protected:
+  char * _buff;
+  uint32_t _position;
+  uint32_t _capacity;
+
+public:
+  BufferedOutputStream(InputStream * stream, uint32_t bufferSize = 64 * 1024);
+
+  virtual ~BufferedOutputStream();
+
+  virtual uint64_t tell();
+
+  virtual void write(const void * buff, uint32_t length);
+
+  virtual void flush();
+
+};
+
+class InputBuffer : public InputStream {
+protected:
+  const char * _buff;
+  uint32_t _position;
+  uint32_t _capacity;
+public:
+  InputBuffer()
+      : _buff(NULL), _position(0), _capacity(0) {
+  }
+
+  InputBuffer(const char * buff, uint32_t capacity)
+      : _buff(buff), _position(0), _capacity(capacity) {
+  }
+
+  InputBuffer(const string & src)
+      : _buff(src.data()), _position(0), _capacity(src.length()) {
+  }
+
+  virtual ~InputBuffer() {
+  }
+
+  virtual void seek(uint64_t position) {
+    if (position <= _capacity) {
+      _position = position;
+    } else {
+      _position = _capacity;
+    }
+  }
+
+  virtual uint64_t tell() {
+    return _position;
+  }
+
+  virtual int32_t read(void * buff, uint32_t length);
+
+  void reset(const char * buff, uint32_t capacity) {
+    _buff = buff;
+    _position = 0;
+    _capacity = capacity;
+  }
+
+  void reset(const string & src) {
+    _buff = src.data();
+    _position = 0;
+    _capacity = src.length();
+  }
+
+  void rewind() {
+    _position = 0;
+  }
+};
+
+class OutputBuffer : public OutputStream {
+protected:
+  char * _buff;
+  uint32_t _position;
+  uint32_t _capacity;
+public:
+  OutputBuffer()
+      : _buff(NULL), _position(0), _capacity(0) {
+  }
+
+  OutputBuffer(char * buff, uint32_t capacity)
+      : _buff(buff), _position(0), _capacity(capacity) {
+  }
+
+  virtual ~OutputBuffer() {
+  }
+
+  virtual uint64_t tell() {
+    return _position;
+  }
+
+  virtual void write(const void * buff, uint32_t length);
+
+  void clear() {
+    _position = 0;
+  }
+
+  void reset(char * buff, uint32_t capacity) {
+    _buff = buff;
+    _position = 0;
+    _capacity = capacity;
+  }
+
+  string getString() {
+    return string(_buff, _position);
+  }
+};
+
+class OutputStringStream : public OutputStream {
+protected:
+  string * _dest;
+public:
+  OutputStringStream()
+      : _dest(NULL) {
+  }
+
+  OutputStringStream(string & dest)
+      : _dest(&dest) {
+  }
+  virtual ~OutputStringStream() {
+  }
+
+  virtual uint64_t tell() {
+    return _dest->length();
+  }
+
+  virtual void write(const void * buff, uint32_t length) {
+    _dest->append((const char *)buff, length);
+  }
+
+  void reset(string * dest) {
+    _dest = dest;
+  }
+
+  void clear() {
+    _dest->clear();
+  }
+
+  string getString() {
+    return *_dest;
+  }
+};
+
+} // namespace NativeTask
+
+#endif /* BUFFERSTREAM_H_ */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/StringUtil.h"
+#include "util/WritableUtils.h"
+#include "Buffers.h"
+
+namespace NativeTask {
+
+DynamicBuffer::DynamicBuffer()
+    : _data(NULL), _capacity(0), _size(0), _used(0) {
+}
+
+DynamicBuffer::DynamicBuffer(uint32_t capacity)
+    : _data(NULL), _capacity(0), _size(0), _used(0) {
+  reserve(capacity);
+}
+
+DynamicBuffer::~DynamicBuffer() {
+  release();
+}
+
+void DynamicBuffer::release() {
+  if (_data != NULL) {
+    free(_data);
+    _data = NULL;
+    _capacity = 0;
+    _used = 0;
+  }
+}
+
+void DynamicBuffer::reserve(uint32_t capacity) {
+  if (_data != NULL) {
+    if (capacity > _capacity) {
+      char * newdata = (char*)realloc(_data, capacity);
+      if (newdata == NULL) {
+        THROW_EXCEPTION_EX(OutOfMemoryException, "DynamicBuffer reserve realloc %u failed",
+            capacity);
+      }
+      _data = newdata;
+      _capacity = capacity;
+    }
+    return;
+  }
+  release();
+  char * newdata = (char*)malloc(capacity);
+  if (newdata == NULL) {
+    THROW_EXCEPTION_EX(OutOfMemoryException, "DynamicBuffer reserve new %u failed", capacity);
+  }
+  _data = newdata;
+  _capacity = capacity;
+  _size = 0;
+  _used = 0;
+}
+
+int32_t DynamicBuffer::refill(InputStream * stream) {
+  if (_data == NULL || freeSpace() == 0) {
+    THROW_EXCEPTION(IOException, "refill DynamicBuffer failed, no space left");
+  }
+  int32_t rd = stream->read(_data + _size, freeSpace());
+  if (rd > 0) {
+    _size += rd;
+  }
+  return rd;
+}
+
+void DynamicBuffer::cleanUsed() {
+  if (_used > 0) {
+    uint32_t needToMove = _size - _used;
+    if (needToMove > 0) {
+      memmove(_data, _data + _used, needToMove);
+      _size = needToMove;
+    } else {
+      _size = 0;
+    }
+    _used = 0;
+  }
+}
+
+///////////////////////////////////////////////////////////
+
+ReadBuffer::ReadBuffer()
+    : _buff(NULL), _remain(0), _size(0), _capacity(0), _stream(NULL), _source(NULL) {
+}
+
+void ReadBuffer::init(uint32_t size, InputStream * stream, const string & codec) {
+  if (size < 1024) {
+    THROW_EXCEPTION_EX(UnsupportException, "ReadBuffer size %u not support.", size);
+  }
+  _buff = (char *)malloc(size);
+  if (NULL == _buff) {
+    THROW_EXCEPTION(OutOfMemoryException, "create append buffer");
+  }
+  _capacity = size;
+  _remain = 0;
+  _size = 0;
+  _stream = stream;
+  _source = _stream;
+  if (codec.length() > 0) {
+    if (!Compressions::support(codec)) {
+      THROW_EXCEPTION(UnsupportException, "compression codec not support");
+    }
+    _source = Compressions::getDecompressionStream(codec, _stream, size);
+  }
+}
+
+ReadBuffer::~ReadBuffer() {
+  if (_source != _stream) {
+    delete _source;
+    _source = NULL;
+  }
+  if (NULL != _buff) {
+    free(_buff);
+    _buff = NULL;
+    _capacity = 0;
+    _remain = 0;
+    _size = 0;
+  }
+}
+
+char * ReadBuffer::fillGet(uint32_t count) {
+
+  if (unlikely(count > _capacity)) {
+    uint32_t newcap = _capacity * 2 > count ? _capacity * 2 : count;
+    char * newbuff = (char*)malloc(newcap);
+
+
+    if (newbuff == NULL) {
+      THROW_EXCEPTION(OutOfMemoryException,
+          StringUtil::Format("buff realloc failed, size=%u", newcap));
+    }
+
+    if (_remain > 0) {
+      memcpy(newbuff, current(), _remain);
+    }
+    if (NULL != _buff) {
+      free(_buff);
+    }
+
+    _buff = newbuff;
+    _capacity = newcap;
+  } else {
+    if (_remain > 0) {
+      memcpy(_buff, current(), _remain);
+    }
+  }
+  _size = _remain;
+  while (_remain < count) {
+    int32_t rd = _source->read(_buff + _size, _capacity - _size);
+    if (rd <= 0) {
+      THROW_EXCEPTION(IOException, "read reach EOF");
+    }
+    _remain += rd;
+    _size += rd;
+  }
+  char * ret = current();
+  _remain -= count;
+  return ret;
+}
+
+int32_t ReadBuffer::fillRead(char * buff, uint32_t len) {
+  uint32_t cp = _remain;
+  if (cp > 0) {
+    memcpy(buff, current(), cp);
+    _remain = 0;
+  }
+  // TODO: read to buffer first
+  int32_t ret = _source->readFully(buff + cp, len - cp);
+  if (ret < 0 && cp == 0) {
+    return ret;
+  } else {
+    return ret < 0 ? cp : ret + cp;
+  }
+}
+
+int64_t ReadBuffer::fillReadVLong() {
+  if (_remain == 0) {
+    int32_t rd = _source->read(_buff, _capacity);
+    if (rd <= 0) {
+      THROW_EXCEPTION(IOException, "fillReadVLong reach EOF");
+    }
+    _remain = rd;
+    _size = rd;
+  }
+  int8_t * pos = (int8_t*)current();
+  if (*pos >= -112) {
+    _remain--;
+    return (int64_t)*pos;
+  }
+  bool neg = *pos < -120;
+  uint32_t len = neg ? (-119 - *pos) : (-111 - *pos);
+  pos = (int8_t*)get(len);
+  const int8_t * end = pos + len;
+  uint64_t value = 0;
+  while (++pos < end) {
+    value = (value << 8) | *(uint8_t*)pos;
+  }
+  return neg ? (value ^ -1LL) : value;
+}
+
+///////////////////////////////////////////////////////////
+
+AppendBuffer::AppendBuffer()
+    : _buff(NULL), _remain(0), _capacity(0), _counter(0), _stream(NULL), _dest(NULL),
+        _compression(false) {
+}
+
+void AppendBuffer::init(uint32_t size, OutputStream * stream, const string & codec) {
+  if (size < 1024) {
+    THROW_EXCEPTION_EX(UnsupportException, "AppendBuffer size %u not support.", size);
+  }
+  _buff = (char *)malloc(size + 8);
+  if (NULL == _buff) {
+    THROW_EXCEPTION(OutOfMemoryException, "create append buffer");
+  }
+  _capacity = size;
+  _remain = _capacity;
+  _stream = stream;
+  _dest = _stream;
+  if (codec.length() > 0) {
+    if (!Compressions::support(codec)) {
+      THROW_EXCEPTION(UnsupportException, "compression codec not support");
+    }
+    _dest = Compressions::getCompressionStream(codec, _stream, size);
+    _compression = true;
+  }
+}
+
+CompressStream * AppendBuffer::getCompressionStream() {
+  if (_compression) {
+    return (CompressStream *)_dest;
+  } else {
+    return NULL;
+  }
+}
+
+AppendBuffer::~AppendBuffer() {
+  if (_dest != _stream) {
+    delete _dest;
+    _dest = NULL;
+  }
+  if (NULL != _buff) {
+    free(_buff);
+    _buff = NULL;
+    _remain = 0;
+    _capacity = 0;
+  }
+}
+
+void AppendBuffer::flushd() {
+  _dest->write(_buff, _capacity - _remain);
+  _counter += _capacity - _remain;
+  _remain = _capacity;
+}
+
+void AppendBuffer::write_inner(const void * data, uint32_t len) {
+  flushd();
+  if (len >= _capacity / 2) {
+    _dest->write(data, len);
+    _counter += len;
+  } else {
+    simple_memcpy(_buff, data, len);
+    _remain -= len;
+  }
+}
+
+void AppendBuffer::write_vlong_inner(int64_t v) {
+  if (_remain < 9) {
+    flushd();
+  }
+  uint32_t len;
+  WritableUtils::WriteVLong(v, current(), len);
+  _remain -= len;
+}
+
+void AppendBuffer::write_vuint2_inner(uint32_t v1, uint32_t v2) {
+  if (_remain < 10) {
+    flushd();
+  }
+  uint32_t len;
+  WritableUtils::WriteVLong(v1, current(), len);
+  _remain -= len;
+  WritableUtils::WriteVLong(v2, current(), len);
+  _remain -= len;
+}
+
+} // namespace NativeTask
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,694 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef BUFFERS_H_
+#define BUFFERS_H_
+
+#include "Streams.h"
+#include "Compressions.h"
+#include "Constants.h"
+
+namespace NativeTask {
+
+class DynamicBuffer {
+protected:
+  char * _data;
+  uint32_t _capacity;
+  uint32_t _size;
+  uint32_t _used;
+public:
+  DynamicBuffer();
+
+  DynamicBuffer(uint32_t capacity);
+
+  ~DynamicBuffer();
+
+  void reserve(uint32_t capacity);
+
+  void release();
+
+  uint32_t capacity() {
+    return _capacity;
+  }
+
+  char * data() {
+    return _data;
+  }
+
+  uint32_t size() {
+    return _size;
+  }
+
+  uint32_t used() {
+    return _used;
+  }
+
+  char * current() {
+    return _data + _used;
+  }
+
+  char * end() {
+    return _data + _size;
+  }
+
+  uint32_t remain() {
+    return _size - _used;
+  }
+
+  uint32_t freeSpace() {
+    return _capacity - _size;
+  }
+
+  void use(uint32_t count) {
+    _used += count;
+  }
+
+  void cleanUsed();
+
+  int32_t refill(InputStream * stream);
+};
+
+/**
+ * A lightweight read buffer, act as buffered input stream
+ */
+class ReadBuffer {
+protected:
+  char * _buff;
+  uint32_t _remain;
+  uint32_t _size;
+  uint32_t _capacity;
+
+  InputStream * _stream;
+  InputStream * _source;
+
+protected:
+  inline char * current() {
+    return _buff + _size - _remain;
+  }
+
+  char * fillGet(uint32_t count);
+  int32_t fillRead(char * buff, uint32_t len);
+  int64_t fillReadVLong();
+public:
+  ReadBuffer();
+
+  void init(uint32_t size, InputStream * stream, const string & codec);
+
+  ~ReadBuffer();
+
+  /**
+   * use get() to get inplace continuous memory of small object
+   */
+  inline char * get(uint32_t count) {
+    if (likely(count <= _remain)) {
+      char * ret = current();
+      _remain -= count;
+      return ret;
+    }
+    return fillGet(count);
+  }
+
+  /**
+   * read to outside buffer
+   */
+  inline int32_t read(char * buff, uint32_t len) {
+    if (likely(len <= _remain)) {
+      memcpy(buff, current(), len);
+      _remain -= len;
+      return len;
+    }
+    return fillRead(buff, len);
+  }
+
+  /**
+   * read to outside buffer, use simple_memcpy
+   */
+  inline void readUnsafe(char * buff, uint32_t len) {
+    if (likely(len <= _remain)) {
+      simple_memcpy(buff, current(), len);
+      _remain -= len;
+      return;
+    }
+    fillRead(buff, len);
+  }
+
+  /**
+   * read VUInt
+   */
+  inline int64_t readVLong() {
+    if (likely(_remain > 0)) {
+      char * mark = current();
+      if (*(int8_t*)mark >= (int8_t)-112) {
+        _remain--;
+        return (int64_t)*mark;
+      }
+    }
+    return fillReadVLong();
+  }
+
+  /**
+   * read uint32_t little endian
+   */
+  inline uint32_t read_uint32_le() {
+    return *(uint32_t*)get(4);
+  }
+
+  /**
+   * read uint32_t big endian
+   */
+  inline uint32_t read_uint32_be() {
+    return bswap(read_uint32_le());
+  }
+};
+
+/**
+ * A light weighted append buffer, used as buffered output streams
+ */
+class AppendBuffer {
+protected:
+  char * _buff;
+  uint32_t _remain;
+  uint32_t _capacity;
+  uint64_t _counter;
+
+  OutputStream * _stream;
+  OutputStream * _dest;
+  bool _compression;
+
+protected:
+  void flushd();
+
+  inline char * current() {
+    return _buff + _capacity - _remain;
+  }
+
+  void write_inner(const void * data, uint32_t len);
+  void write_vlong_inner(int64_t v);
+  void write_vuint2_inner(uint32_t v1, uint32_t v2);
+public:
+  AppendBuffer();
+
+  ~AppendBuffer();
+
+  void init(uint32_t size, OutputStream * stream, const string & codec);
+
+  CompressStream * getCompressionStream();
+
+  uint64_t getCounter() {
+    return _counter;
+  }
+
+  inline char * borrowUnsafe(uint32_t len) {
+    if (likely(_remain >= len)) {
+      return current();
+    }
+    if (likely(_capacity >= len)) {
+      flushd();
+      return _buff;
+    }
+    return NULL;
+  }
+
+  inline void useUnsafe(uint32_t count) {
+    _remain -= count;
+  }
+
+  inline void write(char c) {
+    if (unlikely(_remain == 0)) {
+      flushd();
+    }
+    *current() = c;
+    _remain--;
+  }
+
+  inline void write(const void * data, uint32_t len) {
+    if (likely(len <= _remain)) { // append directly
+      simple_memcpy(current(), data, len);
+      _remain -= len;
+      return;
+    }
+    write_inner(data, len);
+  }
+
+  inline void write_uint32_le(uint32_t v) {
+    if (unlikely(4 > _remain)) {
+      flushd();
+    }
+    *(uint32_t*)current() = v;
+    _remain -= 4;
+    return;
+  }
+
+  inline void write_uint32_be(uint32_t v) {
+    write_uint32_le(bswap(v));
+  }
+
+  inline void write_uint64_le(uint64_t v) {
+    if (unlikely(8 > _remain)) {
+      flushd();
+    }
+    *(uint64_t*)current() = v;
+    _remain -= 8;
+    return;
+  }
+
+  inline void write_uint64_be(uint64_t v) {
+    write_uint64_le(bswap64(v));
+  }
+
+  inline void write_vlong(int64_t v) {
+    if (likely(_remain > 0 && v <= 127 && v >= -112)) {
+      *(char*)current() = (char)v;
+      _remain--;
+      return;
+    }
+    write_vlong_inner(v);
+  }
+
+  inline void write_vuint(uint32_t v) {
+    if (likely(_remain > 0 && v <= 127)) {
+      *(char*)current() = (char)v;
+      _remain--;
+      return;
+    }
+    write_vlong_inner(v);
+  }
+
+  inline void write_vuint2(uint32_t v1, uint32_t v2) {
+    if (likely(_remain >= 2 && v1 <= 127 && v2 <= 127)) {
+      *(char*)current() = (char)v1;
+      *(char*)(current() + 1) = (char)v2;
+      _remain -= 2;
+      return;
+    }
+    write_vuint2_inner(v1, v2);
+  }
+
+  /**
+   * flush current buffer, clear content
+   */
+  inline void flush() {
+    if (_remain < _capacity) {
+      flushd();
+    }
+  }
+};
+
+/**
+ * Memory Key-Value buffer pair with direct address content, so can be
+ * easily copied or dumped to file
+ */
+struct KVBuffer {
+  uint32_t keyLength;
+  uint32_t valueLength;
+  char content[1];
+
+  char * getKey() {
+    return content;
+  }
+
+  char * getValue() {
+    return content + keyLength;
+  }
+
+  KVBuffer * next() {
+    return ((KVBuffer*)(content + keyLength + valueLength));
+  }
+
+  std::string str() {
+    return std::string(content, keyLength) + "\t" + std::string(getValue(), valueLength);
+  }
+
+  uint32_t length() {
+    return keyLength + valueLength + SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH;
+  }
+
+  uint32_t lengthConvertEndium() {
+    long value = bswap64(*((long *)this));
+    return (value >> 32) + value + SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH;
+  }
+
+  void fill(const void * key, uint32_t keylen, const void * value, uint32_t vallen) {
+    keyLength = keylen;
+    valueLength = vallen;
+
+    if (keylen > 0) {
+      simple_memcpy(getKey(), key, keylen);
+    }
+    if (vallen > 0) {
+      simple_memcpy(getValue(), value, vallen);
+    }
+  }
+
+  static uint32_t headerLength() {
+    return SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH;
+  }
+};
+
+struct KVBufferWithParititionId {
+  uint32_t partitionId;
+  KVBuffer buffer;
+
+  inline static uint32_t minLength() {
+    return SIZE_OF_PARTITION_LENGTH + SIZE_OF_KV_LENGTH;
+  }
+
+  int length() {
+    return 4 + buffer.length();
+  }
+
+  int lengthConvertEndium() {
+    return 4 + buffer.lengthConvertEndium();
+  }
+};
+
+/**
+ * Native side abstraction of java ByteBuffer
+ */
+class ByteBuffer {
+private:
+  char * _buff;
+  uint32_t _limit;
+  uint32_t _position;
+  uint32_t _capacity;
+
+public:
+  ByteBuffer()
+      : _buff(NULL), _limit(0), _position(0), _capacity(0) {
+  }
+
+  ~ByteBuffer() {
+  }
+
+  void reset(char * buff, uint32_t inputCapacity) {
+    this->_buff = buff;
+    this->_capacity = inputCapacity;
+    this->_position = 0;
+    this->_limit = 0;
+  }
+
+  int capacity() {
+    return this->_capacity;
+  }
+
+  int remain() {
+    return _limit - _position;
+  }
+
+  int limit() {
+    return _limit;
+  }
+
+  int advance(int positionOffset) {
+    _position += positionOffset;
+    return _position;
+  }
+
+  int position() {
+    return this->_position;
+  }
+
+  void position(int newPos) {
+    this->_position = newPos;
+  }
+
+  void rewind(int newPos, int newLimit) {
+    this->_position = newPos;
+    if (newLimit > this->_capacity) {
+      THROW_EXCEPTION(IOException, "length larger than input buffer capacity");
+    }
+    this->_limit = newLimit;
+  }
+
+  char * current() {
+    return _buff + _position;
+  }
+
+  char * base() {
+    return _buff;
+  }
+};
+
+class ByteArray {
+private:
+  char * _buff;
+  uint32_t _length;
+  uint32_t _capacity;
+
+public:
+  ByteArray()
+      : _buff(NULL), _length(0), _capacity(0) {
+  }
+
+  ~ByteArray() {
+    if (NULL != _buff) {
+      delete[] _buff;
+      _buff = NULL;
+    }
+    _length = 0;
+    _capacity = 0;
+  }
+
+  void resize(uint32_t newSize) {
+    if (newSize <= _capacity) {
+      _length = newSize;
+    } else {
+      if (NULL != _buff) {
+        delete[] _buff;
+        _buff = NULL;
+      }
+      _capacity = 2 * newSize;
+      _buff = new char[_capacity];
+      _length = newSize;
+    }
+  }
+
+  char * buff() {
+    return _buff;
+  }
+
+  uint32_t size() {
+    return _length;
+  }
+};
+
+class FixSizeContainer {
+private:
+  char * _buff;
+  uint32_t _pos;
+  uint32_t _size;
+
+public:
+  FixSizeContainer()
+      : _buff(NULL), _pos(0), _size(0) {
+  }
+
+  ~FixSizeContainer() {
+  }
+
+  void wrap(char * buff, uint32_t size) {
+    _size = size;
+    _buff = buff;
+    _pos = 0;
+  }
+
+  void rewind() {
+    _pos = 0;
+  }
+
+  uint32_t remain() {
+    return _size - _pos;
+  }
+
+  char * current() {
+    return _buff + _pos;
+  }
+
+  char * base() {
+    return _buff;
+  }
+
+  uint32_t size() {
+    return _size;
+  }
+
+  /**
+   * return the length of actually filled data.
+   */
+  uint32_t fill(const char * source, uint32_t maxSize) {
+    int remain = _size - _pos;
+    if (remain <= 0) {
+      return 0;
+    }
+
+    uint32_t length = (maxSize < remain) ? maxSize : remain;
+    simple_memcpy(_buff + _pos, source, length);
+    _pos += length;
+    return length;
+  }
+
+  uint32_t position() {
+    return _pos;
+  }
+
+  void position(int pos) {
+    _pos = pos;
+  }
+};
+
+class ReadWriteBuffer {
+private:
+
+  static const uint32_t INITIAL_LENGTH = 16;
+
+  uint32_t _readPoint;
+  uint32_t _writePoint;
+  char * _buff;
+  uint32_t _buffLength;
+  bool _newCreatedBuff;
+
+public:
+
+  ReadWriteBuffer(uint32_t length)
+      : _readPoint(0), _writePoint(0), _buff(NULL), _buffLength(0), _newCreatedBuff(false) {
+    _buffLength = length;
+    if (_buffLength > 0) {
+      _buff = new char[_buffLength];
+      _newCreatedBuff = true;
+    }
+  }
+
+  ReadWriteBuffer()
+      : _readPoint(0), _writePoint(0), _buff(NULL), _buffLength(0), _newCreatedBuff(false) {
+  }
+
+  ~ReadWriteBuffer() {
+    if (_newCreatedBuff) {
+      delete[] _buff;
+      _buff = NULL;
+    }
+  }
+
+  void setReadPoint(uint32_t pos) {
+    _readPoint = pos;
+  }
+
+  void setWritePoint(uint32_t pos) {
+    _writePoint = pos;
+  }
+
+  char * getBuff() {
+    return _buff;
+  }
+
+  uint32_t getWritePoint() {
+    return _writePoint;
+  }
+
+  uint32_t getReadPoint() {
+    return _readPoint;
+  }
+
+  void writeInt(uint32_t param) {
+    uint32_t written = param;
+
+    checkWriteSpaceAndResizeIfNecessary(4);
+    *((uint32_t *)(_buff + _writePoint)) = written;
+    _writePoint += 4;
+  }
+
+  void writeLong(uint64_t param) {
+    uint64_t written = param;
+
+    checkWriteSpaceAndResizeIfNecessary(8);
+    *((uint64_t *)(_buff + _writePoint)) = written;
+    _writePoint += 8;
+  }
+
+  void writeString(const char * param, uint32_t length) {
+    writeInt(length);
+    checkWriteSpaceAndResizeIfNecessary(length);
+
+    memcpy(_buff + _writePoint, param, length);
+    _writePoint += length;
+  }
+
+  void writeString(std::string * param) {
+    const char * str = param->c_str();
+    int length = param->size();
+    writeString(str, length);
+  }
+
+  void writePointer(void * param) {
+    uint64_t written = (uint64_t)(param);
+    writeLong(written);
+  }
+
+  uint32_t readInt() {
+    char * readPos = _buff + _readPoint;
+    uint32_t result = *((uint32_t *)(readPos));
+    _readPoint += 4;
+    return result;
+  }
+
+  uint64_t readLong() {
+    char * readPos = _buff + _readPoint;
+    uint64_t result = *((uint64_t *)(readPos));
+    _readPoint += 8;
+    return result;
+  }
+
+  std::string * readString() {
+    uint32_t len = readInt();
+    char * strBegin = _buff + _readPoint;
+    _readPoint += len;
+    return new std::string(strBegin, len);
+  }
+
+  void * readPointer() {
+    uint64_t result = readLong();
+    return (void *)(result);
+  }
+
+private:
+  void checkWriteSpaceAndResizeIfNecessary(uint32_t toBeWritten) {
+    if (_buffLength == 0) {
+      _newCreatedBuff = true;
+      _buffLength = INITIAL_LENGTH > toBeWritten ? INITIAL_LENGTH : toBeWritten;
+      _buff = new char[_buffLength];
+    }
+
+    if (_buffLength - _writePoint >= toBeWritten) {
+      return;
+    }
+
+    _buffLength = _buffLength + toBeWritten;
+    _newCreatedBuff = true;
+    char * newBuff = new char[_buffLength];
+    memcpy(newBuff, _buff, _writePoint);
+    delete[] _buff;
+    _buff = newBuff;
+  }
+};
+
+typedef ReadWriteBuffer ParameterBuffer;
+typedef ReadWriteBuffer ResultBuffer;
+
+} // namespace NativeTask
+
+#endif /* BUFFERS_H_ */