You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/17 19:45:01 UTC

svn commit: r1611413 [3/18] - in /hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nativ...

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+import java.util.ServiceLoader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
+import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
+import org.apache.log4j.Logger;
+
+
+/**
+ * this class will load in and init all platforms on classpath
+ * it is also the facade to check for key type support and other
+ * platform methods
+ */
+public class Platforms {
+
+  private static final Logger LOG = Logger.getLogger(Platforms.class);
+  private static final ServiceLoader<Platform> platforms = ServiceLoader.load(Platform.class);
+  
+  public static void init(Configuration conf) throws IOException {
+
+    NativeSerialization.getInstance().reset();
+    synchronized (platforms) {
+      for (Platform platform : platforms) {
+        platform.init();
+      }
+    }
+  }
+
+  public static boolean support(String keyClassName, INativeSerializer serializer, JobConf job) {
+    synchronized (platforms) {
+      for (Platform platform : platforms) {
+        if (platform.support(keyClassName, serializer, job)) {
+          LOG.debug("platform " + platform.name() + " support key class"
+            + keyClassName);
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  public static boolean define(Class keyComparator) {
+    synchronized (platforms) {
+      for (Platform platform : platforms) {
+        if (platform.define(keyComparator)) {
+          LOG.debug("platform " + platform.name() + " define comparator "
+            + keyComparator.getName());
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Task.Counter;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+
+/**
+ * Will periodically check status from native and report to MR framework.
+ * 
+ */
+public class StatusReportChecker implements Runnable {
+
+  private static Log LOG = LogFactory.getLog(StatusReportChecker.class);
+  public static int INTERVAL = 1000; // milli-seconds
+
+  private Thread checker;
+  private final TaskReporter reporter;
+  private final long interval;
+
+  public StatusReportChecker(TaskReporter reporter) {
+    this(reporter, INTERVAL);
+  }
+
+  public StatusReportChecker(TaskReporter reporter, long interval) {
+    this.reporter = reporter;
+    this.interval = interval;
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      try {
+        Thread.sleep(interval);
+      } catch (final InterruptedException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("StatusUpdater thread exiting " + "since it got interrupted");
+        }
+        break;
+      }
+      try {
+        NativeRuntime.reportStatus(reporter);
+      } catch (final IOException e) {
+        LOG.warn("Update native status got exception", e);
+        reporter.setStatus(e.toString());
+        break;
+      }
+    }
+  }
+
+  protected void initUsedCounters() {
+    reporter.getCounter(Counter.MAP_INPUT_RECORDS);
+    reporter.getCounter(Counter.MAP_OUTPUT_RECORDS);
+    reporter.getCounter(Counter.MAP_INPUT_BYTES);
+    reporter.getCounter(Counter.MAP_OUTPUT_BYTES);
+    reporter.getCounter(Counter.MAP_OUTPUT_MATERIALIZED_BYTES);
+    reporter.getCounter(Counter.COMBINE_INPUT_RECORDS);
+    reporter.getCounter(Counter.COMBINE_OUTPUT_RECORDS);
+    reporter.getCounter(Counter.REDUCE_INPUT_RECORDS);
+    reporter.getCounter(Counter.REDUCE_OUTPUT_RECORDS);
+    reporter.getCounter(Counter.REDUCE_INPUT_GROUPS);
+    reporter.getCounter(Counter.SPILLED_RECORDS);
+    reporter.getCounter(Counter.MAP_OUTPUT_BYTES);
+    reporter.getCounter(Counter.MAP_OUTPUT_RECORDS);
+  }
+
+  public synchronized void start() {
+    if (checker == null) {
+      // init counters used by native side,
+      // so they will have correct display name
+      initUsedCounters();
+      checker = new Thread(this);
+      checker.setDaemon(true);
+      checker.start();
+    }
+  }
+
+  public synchronized void stop() throws InterruptedException {
+    if (checker != null) {
+      checker.interrupt();
+      checker.join();
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+
+public class TaskContext {
+  private final JobConf conf;
+  private Class iKClass;
+  private Class iVClass;
+  private Class oKClass;
+  private Class oVClass;
+  private final TaskReporter reporter;
+  private final TaskAttemptID taskAttemptID;
+
+  public TaskContext(JobConf conf, Class iKClass, Class iVClass, Class oKClass, Class oVClass, TaskReporter reporter,
+      TaskAttemptID id) {
+    this.conf = conf;
+    this.iKClass = iKClass;
+    this.iVClass = iVClass;
+    this.oKClass = oKClass;
+    this.oVClass = oVClass;
+    this.reporter = reporter;
+    this.taskAttemptID = id;
+  }
+
+  public Class getInputKeyClass() {
+    return iKClass;
+  }
+
+  public void setInputKeyClass(Class klass) {
+    this.iKClass = klass;
+  }
+
+  public Class getInputValueClass() {
+    return iVClass;
+  }
+
+  public void setInputValueClass(Class klass) {
+    this.iVClass = klass;
+  }
+
+  public Class getOuputKeyClass() {
+    return this.oKClass;
+  }
+
+  public void setOutputKeyClass(Class klass) {
+    this.oKClass = klass;
+  }
+
+  public Class getOutputValueClass() {
+    return this.oVClass;
+  }
+
+  public void setOutputValueClass(Class klass) {
+    this.oVClass = klass;
+  }
+
+  public TaskReporter getTaskReporter() {
+    return this.reporter;
+  }
+
+  public TaskAttemptID getTaskAttemptId() {
+    return this.taskAttemptID;
+  }
+
+  public JobConf getConf() {
+    return this.conf;
+  }
+
+  public TaskContext copyOf() {
+    return new TaskContext(conf, iKClass, iVClass, oKClass, oVClass, reporter, taskAttemptID);
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+public enum BufferType {
+
+  DIRECT_BUFFER,
+
+  HEAP_BUFFER
+};
\ No newline at end of file

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PushbackInputStream;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+
+/**
+ * read data from a input buffer
+ */
+public class ByteBufferDataReader extends DataInputStream {
+  private ByteBuffer byteBuffer;
+  private char lineCache[];
+
+  public ByteBufferDataReader(InputBuffer buffer) {
+    if (buffer != null) {
+      this.byteBuffer = buffer.getByteBuffer();
+    }
+  }
+
+  public void reset(InputBuffer buffer) {
+    this.byteBuffer = buffer.getByteBuffer();
+  }
+
+  @Override
+  public int read() throws IOException {
+    return byteBuffer.get();
+  }
+
+  @Override
+  public int read(byte b[], int off, int len) throws IOException {
+    byteBuffer.get(b, off, len);
+    return len;
+  }
+
+  @Override
+  public void readFully(byte[] b) throws IOException {
+    byteBuffer.get(b, 0, b.length);
+  }
+
+  @Override
+  public void readFully(byte[] b, int off, int len) throws IOException {
+    byteBuffer.get(b, off, len);
+  }
+
+  @Override
+  public int skipBytes(int n) throws IOException {
+    final int remains = byteBuffer.remaining();
+    final int skip = (remains < n) ? remains : n;
+    final int current = byteBuffer.position();
+    byteBuffer.position(current + skip);
+    return skip;
+  }
+
+  @Override
+  public boolean readBoolean() throws IOException {
+    return (byteBuffer.get() == 1) ? true : false;
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    return byteBuffer.get();
+  }
+
+  @Override
+  public int readUnsignedByte() throws IOException {
+    final int ch = byteBuffer.get();
+    if (ch < 0) {
+      throw new EOFException();
+    }
+    return ch;
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    return byteBuffer.getShort();
+  }
+
+  @Override
+  public int readUnsignedShort() throws IOException {
+    return byteBuffer.getShort();
+  }
+
+  @Override
+  public char readChar() throws IOException {
+    return byteBuffer.getChar();
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    return byteBuffer.getInt();
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    return byteBuffer.getLong();
+  }
+
+  @Override
+  public float readFloat() throws IOException {
+    return byteBuffer.getFloat();
+  }
+
+  @Override
+  public double readDouble() throws IOException {
+    return byteBuffer.getDouble();
+  }
+
+  @Override
+  public String readLine() throws IOException {
+
+    InputStream in = this;
+
+    char buf[] = lineCache;
+
+    if (buf == null) {
+      buf = lineCache = new char[128];
+    }
+
+    int room = buf.length;
+    int offset = 0;
+    int c;
+
+    loop: while (true) {
+      switch (c = in.read()) {
+      case -1:
+      case '\n':
+        break loop;
+
+      case '\r':
+        final int c2 = in.read();
+        if ((c2 != '\n') && (c2 != -1)) {
+          if (!(in instanceof PushbackInputStream)) {
+            in = new PushbackInputStream(in);
+          }
+          ((PushbackInputStream) in).unread(c2);
+        }
+        break loop;
+
+      default:
+        if (--room < 0) {
+          buf = new char[offset + 128];
+          room = buf.length - offset - 1;
+          System.arraycopy(lineCache, 0, buf, 0, offset);
+          lineCache = buf;
+        }
+        buf[offset++] = (char) c;
+        break;
+      }
+    }
+    if ((c == -1) && (offset == 0)) {
+      return null;
+    }
+    return String.copyValueOf(buf, 0, offset);
+  }
+
+  @Override
+  public final String readUTF() throws IOException {
+    return readUTF(this);
+  }
+
+  private final static String readUTF(DataInput in) throws IOException {
+    final int utflen = in.readUnsignedShort();
+    byte[] bytearr = null;
+    char[] chararr = null;
+
+    bytearr = new byte[utflen];
+    chararr = new char[utflen];
+
+    int c, char2, char3;
+    int count = 0;
+    int chararr_count = 0;
+
+    in.readFully(bytearr, 0, utflen);
+
+    while (count < utflen) {
+      c = bytearr[count] & 0xff;
+      if (c > 127) {
+        break;
+      }
+      count++;
+      chararr[chararr_count++] = (char) c;
+    }
+
+    while (count < utflen) {
+      c = bytearr[count] & 0xff;
+      switch (c >> 4) {
+      case 0:
+      case 1:
+      case 2:
+      case 3:
+      case 4:
+      case 5:
+      case 6:
+      case 7:
+        /* 0xxxxxxx */
+        count++;
+        chararr[chararr_count++] = (char) c;
+        break;
+      case 12:
+      case 13:
+        /* 110x xxxx 10xx xxxx */
+        count += 2;
+        if (count > utflen) {
+          throw new UTFDataFormatException("malformed input: partial character at end");
+        }
+        char2 = bytearr[count - 1];
+        if ((char2 & 0xC0) != 0x80) {
+          throw new UTFDataFormatException("malformed input around byte " + count);
+        }
+        chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+        break;
+      case 14:
+        /* 1110 xxxx 10xx xxxx 10xx xxxx */
+        count += 3;
+        if (count > utflen) {
+          throw new UTFDataFormatException("malformed input: partial character at end");
+        }
+        char2 = bytearr[count - 2];
+        char3 = bytearr[count - 1];
+        if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+          throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+        }
+        chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+        break;
+      default:
+        /* 10xx xxxx, 1111 xxxx */
+        throw new UTFDataFormatException("malformed input around byte " + count);
+      }
+    }
+    // The number of chars produced may be less than utflen
+    return new String(chararr, 0, chararr_count);
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+  }
+
+  @Override
+  public boolean hasUnReadData() {
+    return null != byteBuffer && byteBuffer.hasRemaining();
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+
+/**
+ * write data to a output buffer
+ */
+public class ByteBufferDataWriter extends DataOutputStream {
+  private ByteBuffer buffer;
+  private final NativeDataTarget target;
+
+  private void checkSizeAndFlushNecessary(int length) throws IOException {
+    if (buffer.position() > 0 && buffer.remaining() < length) {
+      flush();
+    }
+  }
+
+  public ByteBufferDataWriter(NativeDataTarget handler) {
+    if (null != handler) {
+      this.buffer = handler.getOutputBuffer().getByteBuffer();
+    }
+    this.target = handler;
+  }
+
+  @Override
+  public synchronized void write(int v) throws IOException {
+    checkSizeAndFlushNecessary(1);
+    buffer.put((byte) v);
+  }
+
+  @Override
+  public boolean shortOfSpace(int dataLength) throws IOException {
+    if (buffer.remaining() < dataLength) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized void write(byte b[], int off, int len) throws IOException {
+    int remain = len;
+    int offset = off;
+    while (remain > 0) {
+      int currentFlush = 0;
+      if (buffer.remaining() > 0) {
+        currentFlush = Math.min(buffer.remaining(), remain);
+        buffer.put(b, offset, currentFlush);
+        remain -= currentFlush;
+        offset += currentFlush;
+      } else {
+        flush();
+      }
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    target.sendData();
+    buffer.position(0);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (hasUnFlushedData()) {
+      flush();
+    }
+    target.finishSendData();
+  }
+
+  private final static byte TRUE = (byte) 1;
+  private final static byte FALSE = (byte) 0;
+
+  @Override
+  public final void writeBoolean(boolean v) throws IOException {
+    checkSizeAndFlushNecessary(1);
+    buffer.put(v ? TRUE : FALSE);
+  }
+
+  @Override
+  public final void writeByte(int v) throws IOException {
+    checkSizeAndFlushNecessary(1);
+    buffer.put((byte) v);
+  }
+
+  @Override
+  public final void writeShort(int v) throws IOException {
+    checkSizeAndFlushNecessary(2);
+    buffer.putShort((short) v);
+  }
+
+  @Override
+  public final void writeChar(int v) throws IOException {
+    checkSizeAndFlushNecessary(2);
+    buffer.put((byte) ((v >>> 8) & 0xFF));
+    buffer.put((byte) ((v >>> 0) & 0xFF));
+  }
+
+  @Override
+  public final void writeInt(int v) throws IOException {
+    checkSizeAndFlushNecessary(4);
+    buffer.putInt(v);
+  }
+
+  @Override
+  public final void writeLong(long v) throws IOException {
+    checkSizeAndFlushNecessary(8);
+    buffer.putLong(v);
+  }
+
+  @Override
+  public final void writeFloat(float v) throws IOException {
+    checkSizeAndFlushNecessary(4);
+    writeInt(Float.floatToIntBits(v));
+  }
+
+  @Override
+  public final void writeDouble(double v) throws IOException {
+    checkSizeAndFlushNecessary(8);
+    writeLong(Double.doubleToLongBits(v));
+  }
+
+  @Override
+  public final void writeBytes(String s) throws IOException {
+    final int len = s.length();
+
+    int remain = len;
+    int offset = 0;
+    while (remain > 0) {
+      int currentFlush = 0;
+      if (buffer.remaining() > 0) {
+        currentFlush = Math.min(buffer.remaining(), remain);
+
+        for (int i = 0; i < currentFlush; i++) {
+          buffer.put((byte) s.charAt(offset + i));
+        }
+
+        remain -= currentFlush;
+        offset += currentFlush;
+      } else {
+        flush();
+      }
+    }
+  }
+
+  @Override
+  public final void writeChars(String s) throws IOException {
+    final int len = s.length();
+
+    int remain = len;
+    int offset = 0;
+
+    while (remain > 0) {
+      int currentFlush = 0;
+      if (buffer.remaining() > 2) {
+        currentFlush = Math.min(buffer.remaining() / 2, remain);
+
+        for (int i = 0; i < currentFlush; i++) {
+          buffer.putChar(s.charAt(offset + i));
+        }
+
+        remain -= currentFlush;
+        offset += currentFlush;
+      } else {
+        flush();
+      }
+    }
+  }
+
+  @Override
+  public final void writeUTF(String str) throws IOException {
+    writeUTF(str, this);
+  }
+
+  private int writeUTF(String str, DataOutput out) throws IOException {
+    final int strlen = str.length();
+    int utflen = 0;
+    int c, count = 0;
+
+    /* use charAt instead of copying String to char array */
+    for (int i = 0; i < strlen; i++) {
+      c = str.charAt(i);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        utflen++;
+      } else if (c > 0x07FF) {
+        utflen += 3;
+      } else {
+        utflen += 2;
+      }
+    }
+
+    if (utflen > 65535) {
+      throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes");
+    }
+
+    final byte[] bytearr = new byte[utflen + 2];
+    bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+    bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
+
+    int i = 0;
+    for (i = 0; i < strlen; i++) {
+      c = str.charAt(i);
+      if (!((c >= 0x0001) && (c <= 0x007F))) {
+        break;
+      }
+      bytearr[count++] = (byte) c;
+    }
+
+    for (; i < strlen; i++) {
+      c = str.charAt(i);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        bytearr[count++] = (byte) c;
+
+      } else if (c > 0x07FF) {
+        bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+        bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+        bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+      } else {
+        bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+        bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+      }
+    }
+    write(bytearr, 0, utflen + 2);
+    return utflen + 2;
+  }
+
+  @Override
+  public boolean hasUnFlushedData() {
+    return !(buffer.position() == 0);
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.DataInput;
+import java.io.InputStream;
+
+public abstract class DataInputStream extends InputStream implements DataInput {
+  public abstract boolean hasUnReadData();
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class DataOutputStream extends OutputStream implements DataOutput {
+
+  /**
+   * Check whether this buffer has enough space to store length of bytes
+   * 
+   * @param length
+   *          , length of bytes
+   * @return
+   * @throws IOException
+   */
+  public abstract boolean shortOfSpace(int length) throws IOException;
+
+  /**
+   * Check whether there is unflushed data stored in the stream
+   * 
+   * @return
+   */
+  public abstract boolean hasUnFlushedData();
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * as direct buffer memory is not collected by GC, we keep a pool
+ * to reuse direct buffers
+ */
+public class DirectBufferPool {
+  
+  private static DirectBufferPool directBufferPool = null;
+  private static Log LOG = LogFactory.getLog(DirectBufferPool.class);
+  private ConcurrentMap<Integer, Queue<WeakReference<ByteBuffer>>> bufferMap = new ConcurrentHashMap<Integer, Queue<WeakReference<ByteBuffer>>>();
+
+  private DirectBufferPool() {
+  }
+  
+  public static synchronized DirectBufferPool getInstance() {
+    if (null == directBufferPool) {
+      directBufferPool = new DirectBufferPool();
+    }
+    return directBufferPool;
+  }
+  
+  public static void destoryInstance(){
+    directBufferPool = null;
+  }
+  
+  public synchronized ByteBuffer borrowBuffer(int capacity) throws IOException {
+    Queue<WeakReference<ByteBuffer>> list = bufferMap.get(capacity);
+    if (null == list) {
+      return ByteBuffer.allocateDirect(capacity);
+    }
+    WeakReference<ByteBuffer> ref;
+    while ((ref = list.poll()) != null) {
+      ByteBuffer buf = ref.get();
+      if (buf != null) {
+        return buf;
+      }
+    }
+    return ByteBuffer.allocateDirect(capacity);
+  }
+  
+  public void returnBuffer(ByteBuffer buffer) throws IOException {
+    if (null == buffer || !buffer.isDirect()) {
+      throw new IOException("the buffer is null or the buffer returned is not direct buffer");
+    }
+
+    buffer.clear();
+    int capacity = buffer.capacity();
+    Queue<WeakReference<ByteBuffer>> list = bufferMap.get(capacity);
+    if (null == list) {
+      list = new ConcurrentLinkedQueue<WeakReference<ByteBuffer>>();
+      Queue<WeakReference<ByteBuffer>> prev = bufferMap.putIfAbsent(capacity, list);
+      if (prev != null) {
+        list = prev;
+      }
+    }
+    list.add(new WeakReference<ByteBuffer>(buffer));
+  }
+
+  int getBufCountsForCapacity(int capacity) {
+    return bufferMap.get(capacity).size();
+  }
+
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class InputBuffer {
+
+  private ByteBuffer byteBuffer;
+  private final BufferType type;
+
+  public InputBuffer(BufferType type, int inputSize) throws IOException {
+
+    final int capacity = inputSize;
+    this.type = type;
+
+    if (capacity > 0) {
+
+      switch (type) {
+      case DIRECT_BUFFER:
+        this.byteBuffer = DirectBufferPool.getInstance().borrowBuffer(capacity);
+        this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+        break;
+      case HEAP_BUFFER:
+        this.byteBuffer = ByteBuffer.allocate(capacity);
+        this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+        break;
+      }
+      byteBuffer.position(0);
+      byteBuffer.limit(0);
+    }
+  }
+
+  public BufferType getType() {
+    return this.type;
+  }
+
+  public InputBuffer(byte[] bytes) {
+    this.type = BufferType.HEAP_BUFFER;
+    if (bytes.length > 0) {
+      this.byteBuffer = ByteBuffer.wrap(bytes);
+      this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+      byteBuffer.position(0);
+      byteBuffer.limit(0);
+    }
+  }
+
+  public ByteBuffer getByteBuffer() {
+    return this.byteBuffer;
+  }
+
+  public int length() {
+    if (null == byteBuffer) {
+      return 0;
+    }
+    return byteBuffer.limit();
+  }
+
+  public void rewind(int startOffset, int length) {
+    if (null == byteBuffer) {
+      return;
+    }
+    byteBuffer.position(startOffset);
+    byteBuffer.limit(length);
+  }
+
+  public int remaining() {
+    if (null == byteBuffer) {
+      return 0;
+    }
+    return byteBuffer.remaining();
+  }
+
+  public int position() {
+    if (null == byteBuffer) {
+      return 0;
+    }
+    return byteBuffer.position();
+  }
+
+  public int position(int pos) {
+    if (null == byteBuffer) {
+      return 0;
+    }
+
+    byteBuffer.position(pos);
+    return pos;
+  }
+
+  public int capacity() {
+    if (null == byteBuffer) {
+      return 0;
+    }
+    return byteBuffer.capacity();
+  }
+
+  public byte[] array() {
+    if (null == byteBuffer) {
+      return null;
+    }
+    return byteBuffer.array();
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class OutputBuffer {
+  protected ByteBuffer byteBuffer;
+  private final BufferType type;
+
+  public OutputBuffer(BufferType type, int outputBufferCapacity) {
+
+    this.type = type;
+    if (outputBufferCapacity > 0) {
+      switch (type) {
+      case DIRECT_BUFFER:
+        this.byteBuffer = ByteBuffer.allocateDirect(outputBufferCapacity);
+        this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+        break;
+      case HEAP_BUFFER:
+        this.byteBuffer = ByteBuffer.allocate(outputBufferCapacity);
+        this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+        break;
+      }
+    }
+  }
+
+  public OutputBuffer(byte[] bytes) {
+    this.type = BufferType.HEAP_BUFFER;
+    final int outputBufferCapacity = bytes.length;
+    if (outputBufferCapacity > 0) {
+      this.byteBuffer = ByteBuffer.wrap(bytes);
+      this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+      this.byteBuffer.position(0);
+    }
+  }
+
+  public BufferType getType() {
+    return this.type;
+  }
+
+  public ByteBuffer getByteBuffer() {
+    return this.byteBuffer;
+  }
+
+  public int length() {
+    return byteBuffer.position();
+  }
+
+  public void rewind() {
+    byteBuffer.position(0);
+  }
+
+  public int limit() {
+    return byteBuffer.limit();
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataWriter;
+import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
+import org.apache.hadoop.mapred.nativetask.serde.KVSerializer;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+
+/**
+ * load data into a buffer signaled by a {@link BufferPuller}
+ */
+public class BufferPullee<IK, IV> implements IDataLoader {
+
+  public static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH;
+
+  private final SizedWritable<IK> tmpInputKey;
+  private final SizedWritable<IV> tmpInputValue;
+  private boolean inputKVBufferd = false;
+  private RawKeyValueIterator rIter;
+  private ByteBufferDataWriter nativeWriter;
+  protected KVSerializer<IK, IV> serializer;
+  private final OutputBuffer outputBuffer;
+  private final NativeDataTarget target;
+  private boolean closed = false;
+  
+  public BufferPullee(Class<IK> iKClass, Class<IV> iVClass, RawKeyValueIterator rIter, NativeDataTarget target)
+      throws IOException {
+    this.rIter = rIter;
+    tmpInputKey = new SizedWritable<IK>(iKClass);
+    tmpInputValue = new SizedWritable<IV>(iVClass);
+
+    if (null != iKClass && null != iVClass) {
+      this.serializer = new KVSerializer<IK, IV>(iKClass, iVClass);
+    }
+    this.outputBuffer = target.getOutputBuffer();
+    this.target = target;
+  }
+
+  @Override
+  public int load() throws IOException {
+    if (closed) {
+      return 0;
+    }
+    
+    if (null == outputBuffer) {
+      throw new IOException("output buffer not set");
+    }
+
+    this.nativeWriter = new ByteBufferDataWriter(target);
+    outputBuffer.rewind();
+
+    int written = 0;
+    boolean firstKV = true;
+
+    if (inputKVBufferd) {
+      written += serializer.serializeKV(nativeWriter, tmpInputKey, tmpInputValue);
+      inputKVBufferd = false;
+      firstKV = false;
+    }
+
+    while (rIter.next()) {
+      inputKVBufferd = false;
+      tmpInputKey.readFields(rIter.getKey());
+      tmpInputValue.readFields(rIter.getValue());
+      serializer.updateLength(tmpInputKey, tmpInputValue);
+
+      final int kvSize = tmpInputKey.length + tmpInputValue.length + KV_HEADER_LENGTH;
+
+      if (!firstKV && nativeWriter.shortOfSpace(kvSize)) {
+        inputKVBufferd = true;
+        break;
+      } else {
+        written += serializer.serializeKV(nativeWriter, tmpInputKey, tmpInputValue);
+        firstKV = false;
+      }
+    }
+
+    if (nativeWriter.hasUnFlushedData()) {
+      nativeWriter.flush();
+    }
+    return written;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    if (null != rIter) {
+      rIter.close();
+    }
+    if (null != nativeWriter) {
+      nativeWriter.close();
+    }
+    closed = true;
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,187 @@
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.DataReceiver;
+import org.apache.hadoop.mapred.nativetask.NativeDataSource;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataReader;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.serde.SerializationFramework;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.apache.hadoop.util.Progress;
+
+/**
+ * actively signal a {@link BufferPullee} to load data into buffer and receive
+ */
+public class BufferPuller implements RawKeyValueIterator, DataReceiver {
+  
+  private static Log LOG = LogFactory.getLog(BufferPuller.class);
+
+  public final static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH;
+
+  byte[] keyBytes = new byte[0];
+  byte[] valueBytes = new byte[0];
+
+  private InputBuffer inputBuffer;
+  private InputBuffer asideBuffer;
+
+  int remain = 0;
+
+  private ByteBufferDataReader nativeReader;
+
+  DataInputBuffer keyBuffer = new DataInputBuffer();
+  DataInputBuffer valueBuffer = new DataInputBuffer();
+
+  private boolean noMoreData = false;
+
+  private NativeDataSource input;
+  private boolean closed = false;
+
+  public BufferPuller(NativeDataSource handler) throws IOException {
+    this.input = handler;
+    this.inputBuffer = handler.getInputBuffer();
+    nativeReader = new ByteBufferDataReader(null);
+    this.asideBuffer = new InputBuffer(BufferType.HEAP_BUFFER, inputBuffer.capacity());
+  }
+
+  @Override
+  public DataInputBuffer getKey() throws IOException {
+    return keyBuffer;
+  }
+
+  @Override
+  public DataInputBuffer getValue() throws IOException {
+    return valueBuffer;
+  }
+  
+  public void reset() {
+    noMoreData = false;
+  }
+
+  @Override
+  public boolean next() throws IOException {
+    if (closed) {
+      return false;
+    }
+    
+    if (noMoreData) {
+      return false;
+    }
+    final int asideRemain = asideBuffer.remaining();
+    final int inputRemain = inputBuffer.remaining();
+
+    if (asideRemain == 0 && inputRemain == 0) {
+      input.loadData();
+    }
+
+    if (asideBuffer.remaining() > 0) {
+      return nextKeyValue(asideBuffer);
+    } else if (inputBuffer.remaining() > 0) {
+      return nextKeyValue(inputBuffer);
+    } else {
+      noMoreData = true;
+      return false;
+    }
+  }
+
+  private boolean nextKeyValue(InputBuffer buffer) throws IOException {
+    if (closed) {
+      return false;
+    }
+    
+    nativeReader.reset(buffer);
+
+    final int keyLength = nativeReader.readInt();
+    if (keyBytes.length < keyLength) {
+      keyBytes = new byte[keyLength];
+    }
+
+    final int valueLength = nativeReader.readInt();
+    if (valueBytes.length < valueLength) {
+      valueBytes = new byte[valueLength];
+    }
+    
+    nativeReader.read(keyBytes, 0, keyLength);
+    nativeReader.read(valueBytes, 0, valueLength);
+
+    keyBuffer.reset(keyBytes, keyLength);
+    valueBuffer.reset(valueBytes, valueLength);
+
+    return true;
+  }
+
+  @Override
+  public boolean receiveData() throws IOException {
+    if (closed) {
+      return false;
+    }
+    
+    final ByteBuffer input = inputBuffer.getByteBuffer();
+    
+    if (null != asideBuffer && asideBuffer.length() > 0) {
+      if (asideBuffer.remaining() > 0) {
+        final byte[] output = asideBuffer.getByteBuffer().array();
+        final int write = Math.min(asideBuffer.remaining(), input.remaining());
+        input.get(output, asideBuffer.position(), write);
+        asideBuffer.position(asideBuffer.position() + write);
+      }
+
+      if (asideBuffer.remaining() == 0) {
+        asideBuffer.position(0);
+      }
+    }
+
+    if (input.remaining() == 0) {
+      return true;
+    }
+
+    if (input.remaining() < KV_HEADER_LENGTH) {
+      throw new IOException("incomplete data, input length is: " + input.remaining());
+    }
+    final int position = input.position();
+    final int keyLength = input.getInt();
+    final int valueLength = input.getInt();
+    input.position(position);
+    final int kvLength = keyLength + valueLength + KV_HEADER_LENGTH;
+    final int remaining = input.remaining();
+
+    if (kvLength > remaining) {
+      if (null == asideBuffer || asideBuffer.capacity() < kvLength) {
+        asideBuffer = new InputBuffer(BufferType.HEAP_BUFFER, kvLength);
+      }
+      asideBuffer.rewind(0, kvLength);
+
+      input.get(asideBuffer.array(), 0, remaining);
+      asideBuffer.position(remaining);
+    }
+    return true;
+  }
+
+  @Override
+  public Progress getProgress() {
+    return null;
+  }
+  
+  /**
+   * Closes the iterator so that the underlying streams can be closed.
+   * 
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    if (null != nativeReader) {
+      nativeReader.close();
+    }
+    closed = true;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataReader;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.serde.KVSerializer;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+
+/**
+ * collect data when signaled
+ */
+public class BufferPushee<OK, OV> implements Closeable {
+
+  private static Log LOG = LogFactory.getLog(BufferPushee.class);
+  
+  public final static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH;
+
+  private InputBuffer asideBuffer;
+  private final SizedWritable<OK> tmpOutputKey;
+  private final SizedWritable<OV> tmpOutputValue;
+  private RecordWriter<OK, OV> writer;
+  private ByteBufferDataReader nativeReader;
+
+  private KVSerializer<OK, OV> deserializer;
+  private boolean closed = false;
+
+  public BufferPushee(Class<OK> oKClass, Class<OV> oVClass, RecordWriter<OK, OV> writer) throws IOException {
+    tmpOutputKey = new SizedWritable<OK>(oKClass);
+    tmpOutputValue = new SizedWritable<OV>(oVClass);
+
+    this.writer = writer;
+
+    if (null != oKClass && null != oVClass) {
+      this.deserializer = new KVSerializer<OK, OV>(oKClass, oVClass);
+    }
+    this.nativeReader = new ByteBufferDataReader(null);
+  }
+
+  public boolean collect(InputBuffer buffer) throws IOException {
+    if (closed) {
+      return false;
+    }
+    
+    final ByteBuffer input = buffer.getByteBuffer();
+    if (null != asideBuffer && asideBuffer.length() > 0) {
+      if (asideBuffer.remaining() > 0) {
+        final byte[] output = asideBuffer.getByteBuffer().array();
+        final int write = Math.min(asideBuffer.remaining(), input.remaining());
+        input.get(output, asideBuffer.position(), write);
+        asideBuffer.position(asideBuffer.position() + write);
+      }
+
+      if (asideBuffer.remaining() == 0 && asideBuffer.position() > 0) {
+        asideBuffer.position(0);
+        write(asideBuffer);
+        asideBuffer.rewind(0, 0);
+      }
+    }
+
+    if (input.remaining() == 0) {
+      return true;
+    }
+
+    if (input.remaining() < KV_HEADER_LENGTH) {
+      throw new IOException("incomplete data, input length is: " + input.remaining());
+    }
+    final int position = input.position();
+    final int keyLength = input.getInt();
+    final int valueLength = input.getInt();
+    input.position(position);
+    final int kvLength = keyLength + valueLength + KV_HEADER_LENGTH;
+    final int remaining = input.remaining();
+
+    if (kvLength > remaining) {
+      if (null == asideBuffer || asideBuffer.capacity() < kvLength) {
+        asideBuffer = new InputBuffer(BufferType.HEAP_BUFFER, kvLength);
+      }
+      asideBuffer.rewind(0, kvLength);
+
+      input.get(asideBuffer.array(), 0, remaining);
+      asideBuffer.position(remaining);
+    } else {
+      write(buffer);
+    }
+    return true;
+  }
+
+  @SuppressWarnings("unchecked")
+  private boolean write(InputBuffer input) throws IOException {
+    if (closed) {
+      return false;
+    }
+    int totalRead = 0;
+    final int remain = input.remaining();
+    this.nativeReader.reset(input);
+    while (remain > totalRead) {
+      final int read = deserializer.deserializeKV(nativeReader, tmpOutputKey, tmpOutputValue);
+      if (read != 0) {
+        totalRead += read;
+        writer.write((OK) (tmpOutputKey.v), (OV) (tmpOutputValue.v));
+      }
+    }
+    if (remain != totalRead) {
+      throw new IOException("We expect to read " + remain + ", but we actually read: " + totalRead);
+    }
+    return true;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    if (null != writer) {
+      writer.close(null);
+    }
+    if (null != nativeReader) {
+      nativeReader.close();
+    }
+    closed = true;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataWriter;
+import org.apache.hadoop.mapred.nativetask.serde.IKVSerializer;
+import org.apache.hadoop.mapred.nativetask.serde.KVSerializer;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+
+/**
+ * actively push data into a buffer and signal a {@link BufferPushee} to collect it
+ */
+public class BufferPusher<K, V> implements OutputCollector<K, V> {
+  
+  private static Log LOG = LogFactory.getLog(BufferPusher.class);
+
+  private final SizedWritable<K> tmpInputKey;
+  private final SizedWritable<V> tmpInputValue;
+  private ByteBufferDataWriter out;
+  IKVSerializer serializer;
+  private boolean closed = false;
+
+  public BufferPusher(Class<K> iKClass, Class<V> iVClass, NativeDataTarget target) throws IOException {
+    tmpInputKey = new SizedWritable<K>(iKClass);
+    tmpInputValue = new SizedWritable<V>(iVClass);
+
+    if (null != iKClass && null != iVClass) {
+      this.serializer = new KVSerializer<K, V>(iKClass, iVClass);
+    }
+    this.out = new ByteBufferDataWriter(target);
+  }
+
+  public void collect(K key, V value, int partition) throws IOException {
+    tmpInputKey.reset(key);
+    tmpInputValue.reset(value);
+    serializer.serializePartitionKV(out, partition, tmpInputKey, tmpInputValue);
+  };
+
+  @Override
+  public void collect(K key, V value) throws IOException {
+    if (closed) {
+      return;
+    }
+    tmpInputKey.reset(key);
+    tmpInputValue.reset(value);
+    serializer.serializeKV(out, tmpInputKey, tmpInputValue);
+  };
+
+  public void flush() throws IOException {
+    if (null != out) {
+      if (out.hasUnFlushedData()) {
+        out.flush();
+      }
+    }
+  }
+  
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    if (null != out) {
+      out.close();
+    }
+    closed = true;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import static org.apache.hadoop.mapred.Task.Counter.COMBINE_INPUT_RECORDS;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Task.CombinerRunner;
+import org.apache.hadoop.mapred.nativetask.Command;
+import org.apache.hadoop.mapred.nativetask.CommandDispatcher;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.DataChannel;
+import org.apache.hadoop.mapred.nativetask.ICombineHandler;
+import org.apache.hadoop.mapred.nativetask.INativeHandler;
+import org.apache.hadoop.mapred.nativetask.NativeBatchProcessor;
+import org.apache.hadoop.mapred.nativetask.TaskContext;
+import org.apache.hadoop.mapred.nativetask.serde.SerializationFramework;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+
+public class CombinerHandler<K, V> implements ICombineHandler, CommandDispatcher {
+
+  public static String NAME = "NativeTask.CombineHandler";
+  private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class);
+  public static Command LOAD = new Command(1, "Load");
+  public static Command COMBINE = new Command(4, "Combine");
+  public final CombinerRunner<K, V> combinerRunner;
+
+  private final INativeHandler nativeHandler;
+  private final BufferPuller puller;
+  private final BufferPusher<K, V> kvPusher;
+  private boolean closed = false;
+
+  public static <K, V> ICombineHandler create(TaskContext context) throws IOException, ClassNotFoundException {
+    final JobConf conf = new JobConf(context.getConf());
+    conf.set(Constants.SERIALIZATION_FRAMEWORK,
+        String.valueOf(SerializationFramework.WRITABLE_SERIALIZATION.getType()));
+    String combinerClazz = conf.get(Constants.MAPRED_COMBINER_CLASS);
+    if (null == combinerClazz) {
+      combinerClazz = conf.get(MRJobConfig.COMBINE_CLASS_ATTR);
+    }
+
+    if (null == combinerClazz) {
+      return null;
+    } else {
+      LOG.info("NativeTask Combiner is enabled, class = " + combinerClazz);
+    }
+
+    final Counter combineInputCounter = context.getTaskReporter().getCounter(COMBINE_INPUT_RECORDS);
+    
+    final CombinerRunner<K, V> combinerRunner = CombinerRunner.create(conf, context.getTaskAttemptId(),
+        combineInputCounter, context.getTaskReporter(), null);
+
+    final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, conf, DataChannel.INOUT);
+    final BufferPusher<K, V> pusher = new BufferPusher<K, V>(context.getInputKeyClass(), context.getInputValueClass(),
+        nativeHandler);
+    final BufferPuller puller = new BufferPuller(nativeHandler);
+    return new CombinerHandler<K, V>(nativeHandler, combinerRunner, puller, pusher);
+  }
+
+  public CombinerHandler(INativeHandler nativeHandler, CombinerRunner<K, V> combiner, BufferPuller puller,
+      BufferPusher<K, V> kvPusher) throws IOException {
+    this.nativeHandler = nativeHandler;
+    this.combinerRunner = combiner;
+    this.puller = puller;
+    this.kvPusher = kvPusher;
+    nativeHandler.setCommandDispatcher(this);
+    nativeHandler.setDataReceiver(puller);
+  }
+
+  @Override
+  public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException {
+ if (null == command) {
+      return null;
+    }
+    if (command.equals(COMBINE)) {
+      combine();
+    }
+    return null;
+
+  }
+
+  @Override
+  public void combine() throws IOException{
+    try {
+      puller.reset();
+      combinerRunner.combine(puller, kvPusher);
+      kvPusher.flush();
+      return;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public long getId() {
+    return nativeHandler.getNativeHandler();
+  }
+
+  @Override
+  public void close() throws IOException {
+
+    if (closed) {
+      return;
+    }
+
+    if (null != puller) {
+      puller.close();
+    }
+
+    if (null != kvPusher) {
+      kvPusher.close();
+    }
+
+    if (null != nativeHandler) {
+      nativeHandler.close();
+    }
+    closed = true;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+/**
+ * an IDataLoader loads data on demand
+ */
+public interface IDataLoader {
+
+  /**
+   * @return size of data loaded
+   * @throws IOException
+   */
+  public int load() throws IOException;
+
+  public void close() throws IOException;
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.nativetask.Command;
+import org.apache.hadoop.mapred.nativetask.CommandDispatcher;
+import org.apache.hadoop.mapred.nativetask.DataChannel;
+import org.apache.hadoop.mapred.nativetask.ICombineHandler;
+import org.apache.hadoop.mapred.nativetask.INativeHandler;
+import org.apache.hadoop.mapred.nativetask.NativeBatchProcessor;
+import org.apache.hadoop.mapred.nativetask.TaskContext;
+import org.apache.hadoop.mapred.nativetask.util.NativeTaskOutput;
+import org.apache.hadoop.mapred.nativetask.util.OutputUtil;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+
+/**
+ * Java Record Reader + Java Mapper + Native Collector
+ */
+@SuppressWarnings("unchecked")
+public class NativeCollectorOnlyHandler<K, V> implements CommandDispatcher, Closeable {
+
+  public static String NAME = "NativeTask.MCollectorOutputHandler";
+  private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class);
+  public static Command GET_OUTPUT_PATH = new Command(100, "GET_OUTPUT_PATH");
+  public static Command GET_OUTPUT_INDEX_PATH = new Command(101, "GET_OUTPUT_INDEX_PATH");
+  public static Command GET_SPILL_PATH = new Command(102, "GET_SPILL_PATH");
+  public static Command GET_COMBINE_HANDLER = new Command(103, "GET_COMBINE_HANDLER");
+  
+  private NativeTaskOutput output;
+  private int spillNumber = 0;
+  private ICombineHandler combinerHandler = null;
+  private final BufferPusher<K, V> kvPusher;
+  private final INativeHandler nativeHandler;
+  private boolean closed = false;
+
+  public static <K, V> NativeCollectorOnlyHandler<K, V> create(TaskContext context) throws IOException {
+
+    
+    ICombineHandler combinerHandler = null;
+    try {
+      final TaskContext combineContext = context.copyOf();
+      combineContext.setInputKeyClass(context.getOuputKeyClass());
+      combineContext.setInputValueClass(context.getOutputValueClass());
+
+      combinerHandler = CombinerHandler.create(combineContext);
+    } catch (final ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+    
+    if (null != combinerHandler) {
+      LOG.info("[NativeCollectorOnlyHandler] combiner is not null");
+    }
+
+    final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, context.getConf(), DataChannel.OUT);
+    final BufferPusher<K, V> kvPusher = new BufferPusher<K, V>(context.getOuputKeyClass(), context.getOutputValueClass(),
+        nativeHandler);
+
+    return new NativeCollectorOnlyHandler<K, V>(context, nativeHandler, kvPusher, combinerHandler);
+  }
+
+  protected NativeCollectorOnlyHandler(TaskContext context, INativeHandler nativeHandler,
+      BufferPusher<K, V> kvPusher, ICombineHandler combiner) throws IOException {
+    Configuration conf = context.getConf();
+    TaskAttemptID id = context.getTaskAttemptId();
+    if (null == id) {
+      this.output = OutputUtil.createNativeTaskOutput(conf, "");
+    } else {
+      this.output = OutputUtil.createNativeTaskOutput(context.getConf(), context.getTaskAttemptId()
+        .toString());
+    }
+    this.combinerHandler = combiner;
+    this.kvPusher = kvPusher;
+    this.nativeHandler = nativeHandler;
+    nativeHandler.setCommandDispatcher(this);
+  }
+
+  public void collect(K key, V value, int partition) throws IOException {
+    kvPusher.collect(key, value, partition);
+  };
+
+  public void flush() throws IOException {
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+
+    if (null != kvPusher) {
+      kvPusher.close();
+    }
+
+    if (null != combinerHandler) {
+      combinerHandler.close();
+    }
+
+    if (null != nativeHandler) {
+      nativeHandler.close();
+    }
+    closed = true;
+  }
+
+  @Override
+  public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException {
+    Path p = null;
+    if (null == command) {
+      return null;
+    }
+        
+    if (command.equals(GET_OUTPUT_PATH)) {
+      p = output.getOutputFileForWrite(-1);
+    } else if (command.equals(GET_OUTPUT_INDEX_PATH)) {
+      p = output.getOutputIndexFileForWrite(-1);
+    } else if (command.equals(GET_SPILL_PATH)) {
+      p = output.getSpillFileForWrite(spillNumber++, -1);
+      
+    } else if (command.equals(GET_COMBINE_HANDLER)) {
+      if (null == combinerHandler) {
+        return null;
+      }
+      final ReadWriteBuffer result = new ReadWriteBuffer(8);
+      
+      result.writeLong(combinerHandler.getId());
+      return result;
+    } else {
+      throw new IOException("Illegal command: " + command.toString());
+    }
+    if (p != null) {
+      final ReadWriteBuffer result = new ReadWriteBuffer();
+      result.writeString(p.toUri().getPath());
+      return result;
+    } else {
+      throw new IOException("MapOutputFile can't allocate spill/output file");
+    }
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class BoolWritableSerializer extends DefaultSerializer implements
+    INativeComparable {
+
+  @Override
+  public int getLength(Writable w) throws IOException {
+    return 1;
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class ByteWritableSerializer extends DefaultSerializer implements
+    INativeComparable {
+
+  @Override
+  public int getLength(Writable w) throws IOException {
+    return 1;
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class BytesWritableSerializer implements INativeComparable, INativeSerializer<BytesWritable> {
+
+  @Override
+  public int getLength(BytesWritable w) throws IOException {
+    return w.getLength();
+  }
+
+  @Override
+  public void serialize(BytesWritable w, DataOutput out) throws IOException {
+    out.write(w.getBytes(), 0, w.getLength());
+  }
+
+  @Override
+  public void deserialize(DataInput in, int length, BytesWritable w) throws IOException {
+    w.setSize(length);
+    in.readFully(w.getBytes(), 0, length);
+  }
+}
\ No newline at end of file