You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/17 19:45:01 UTC
svn commit: r1611413 [3/18] - in
/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client:
./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nativ...
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platforms.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+import java.util.ServiceLoader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
+import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
+import org.apache.log4j.Logger;
+
+
+/**
+ * this class will load in and init all platforms on classpath
+ * it is also the facade to check for key type support and other
+ * platform methods
+ */
+public class Platforms {
+
+ private static final Logger LOG = Logger.getLogger(Platforms.class);
+ private static final ServiceLoader<Platform> platforms = ServiceLoader.load(Platform.class);
+
+ public static void init(Configuration conf) throws IOException {
+
+ NativeSerialization.getInstance().reset();
+ synchronized (platforms) {
+ for (Platform platform : platforms) {
+ platform.init();
+ }
+ }
+ }
+
+ public static boolean support(String keyClassName, INativeSerializer serializer, JobConf job) {
+ synchronized (platforms) {
+ for (Platform platform : platforms) {
+ if (platform.support(keyClassName, serializer, job)) {
+ LOG.debug("platform " + platform.name() + " support key class"
+ + keyClassName);
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public static boolean define(Class keyComparator) {
+ synchronized (platforms) {
+ for (Platform platform : platforms) {
+ if (platform.define(keyComparator)) {
+ LOG.debug("platform " + platform.name() + " define comparator "
+ + keyComparator.getName());
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/StatusReportChecker.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Task.Counter;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+
+/**
+ * Will periodically check status from native and report to MR framework.
+ *
+ */
+public class StatusReportChecker implements Runnable {
+
+ private static Log LOG = LogFactory.getLog(StatusReportChecker.class);
+ public static int INTERVAL = 1000; // milli-seconds
+
+ private Thread checker;
+ private final TaskReporter reporter;
+ private final long interval;
+
+ public StatusReportChecker(TaskReporter reporter) {
+ this(reporter, INTERVAL);
+ }
+
+ public StatusReportChecker(TaskReporter reporter, long interval) {
+ this.reporter = reporter;
+ this.interval = interval;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(interval);
+ } catch (final InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("StatusUpdater thread exiting " + "since it got interrupted");
+ }
+ break;
+ }
+ try {
+ NativeRuntime.reportStatus(reporter);
+ } catch (final IOException e) {
+ LOG.warn("Update native status got exception", e);
+ reporter.setStatus(e.toString());
+ break;
+ }
+ }
+ }
+
+ protected void initUsedCounters() {
+ reporter.getCounter(Counter.MAP_INPUT_RECORDS);
+ reporter.getCounter(Counter.MAP_OUTPUT_RECORDS);
+ reporter.getCounter(Counter.MAP_INPUT_BYTES);
+ reporter.getCounter(Counter.MAP_OUTPUT_BYTES);
+ reporter.getCounter(Counter.MAP_OUTPUT_MATERIALIZED_BYTES);
+ reporter.getCounter(Counter.COMBINE_INPUT_RECORDS);
+ reporter.getCounter(Counter.COMBINE_OUTPUT_RECORDS);
+ reporter.getCounter(Counter.REDUCE_INPUT_RECORDS);
+ reporter.getCounter(Counter.REDUCE_OUTPUT_RECORDS);
+ reporter.getCounter(Counter.REDUCE_INPUT_GROUPS);
+ reporter.getCounter(Counter.SPILLED_RECORDS);
+ reporter.getCounter(Counter.MAP_OUTPUT_BYTES);
+ reporter.getCounter(Counter.MAP_OUTPUT_RECORDS);
+ }
+
+ public synchronized void start() {
+ if (checker == null) {
+ // init counters used by native side,
+ // so they will have correct display name
+ initUsedCounters();
+ checker = new Thread(this);
+ checker.setDaemon(true);
+ checker.start();
+ }
+ }
+
+ public synchronized void stop() throws InterruptedException {
+ if (checker != null) {
+ checker.interrupt();
+ checker.join();
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/TaskContext.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+
+public class TaskContext {
+ private final JobConf conf;
+ private Class iKClass;
+ private Class iVClass;
+ private Class oKClass;
+ private Class oVClass;
+ private final TaskReporter reporter;
+ private final TaskAttemptID taskAttemptID;
+
+ public TaskContext(JobConf conf, Class iKClass, Class iVClass, Class oKClass, Class oVClass, TaskReporter reporter,
+ TaskAttemptID id) {
+ this.conf = conf;
+ this.iKClass = iKClass;
+ this.iVClass = iVClass;
+ this.oKClass = oKClass;
+ this.oVClass = oVClass;
+ this.reporter = reporter;
+ this.taskAttemptID = id;
+ }
+
+ public Class getInputKeyClass() {
+ return iKClass;
+ }
+
+ public void setInputKeyClass(Class klass) {
+ this.iKClass = klass;
+ }
+
+ public Class getInputValueClass() {
+ return iVClass;
+ }
+
+ public void setInputValueClass(Class klass) {
+ this.iVClass = klass;
+ }
+
+ public Class getOuputKeyClass() {
+ return this.oKClass;
+ }
+
+ public void setOutputKeyClass(Class klass) {
+ this.oKClass = klass;
+ }
+
+ public Class getOutputValueClass() {
+ return this.oVClass;
+ }
+
+ public void setOutputValueClass(Class klass) {
+ this.oVClass = klass;
+ }
+
+ public TaskReporter getTaskReporter() {
+ return this.reporter;
+ }
+
+ public TaskAttemptID getTaskAttemptId() {
+ return this.taskAttemptID;
+ }
+
+ public JobConf getConf() {
+ return this.conf;
+ }
+
+ public TaskContext copyOf() {
+ return new TaskContext(conf, iKClass, iVClass, oKClass, oVClass, reporter, taskAttemptID);
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/BufferType.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+public enum BufferType {
+
+ DIRECT_BUFFER,
+
+ HEAP_BUFFER
+};
\ No newline at end of file
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PushbackInputStream;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+
+/**
+ * read data from a input buffer
+ */
+public class ByteBufferDataReader extends DataInputStream {
+ private ByteBuffer byteBuffer;
+ private char lineCache[];
+
+ public ByteBufferDataReader(InputBuffer buffer) {
+ if (buffer != null) {
+ this.byteBuffer = buffer.getByteBuffer();
+ }
+ }
+
+ public void reset(InputBuffer buffer) {
+ this.byteBuffer = buffer.getByteBuffer();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return byteBuffer.get();
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ byteBuffer.get(b, off, len);
+ return len;
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ byteBuffer.get(b, 0, b.length);
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ byteBuffer.get(b, off, len);
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ final int remains = byteBuffer.remaining();
+ final int skip = (remains < n) ? remains : n;
+ final int current = byteBuffer.position();
+ byteBuffer.position(current + skip);
+ return skip;
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ return (byteBuffer.get() == 1) ? true : false;
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ return byteBuffer.get();
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ final int ch = byteBuffer.get();
+ if (ch < 0) {
+ throw new EOFException();
+ }
+ return ch;
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ return byteBuffer.getShort();
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ return byteBuffer.getShort();
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ return byteBuffer.getChar();
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ return byteBuffer.getInt();
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ return byteBuffer.getLong();
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return byteBuffer.getFloat();
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return byteBuffer.getDouble();
+ }
+
+ @Override
+ public String readLine() throws IOException {
+
+ InputStream in = this;
+
+ char buf[] = lineCache;
+
+ if (buf == null) {
+ buf = lineCache = new char[128];
+ }
+
+ int room = buf.length;
+ int offset = 0;
+ int c;
+
+ loop: while (true) {
+ switch (c = in.read()) {
+ case -1:
+ case '\n':
+ break loop;
+
+ case '\r':
+ final int c2 = in.read();
+ if ((c2 != '\n') && (c2 != -1)) {
+ if (!(in instanceof PushbackInputStream)) {
+ in = new PushbackInputStream(in);
+ }
+ ((PushbackInputStream) in).unread(c2);
+ }
+ break loop;
+
+ default:
+ if (--room < 0) {
+ buf = new char[offset + 128];
+ room = buf.length - offset - 1;
+ System.arraycopy(lineCache, 0, buf, 0, offset);
+ lineCache = buf;
+ }
+ buf[offset++] = (char) c;
+ break;
+ }
+ }
+ if ((c == -1) && (offset == 0)) {
+ return null;
+ }
+ return String.copyValueOf(buf, 0, offset);
+ }
+
+ @Override
+ public final String readUTF() throws IOException {
+ return readUTF(this);
+ }
+
+ private final static String readUTF(DataInput in) throws IOException {
+ final int utflen = in.readUnsignedShort();
+ byte[] bytearr = null;
+ char[] chararr = null;
+
+ bytearr = new byte[utflen];
+ chararr = new char[utflen];
+
+ int c, char2, char3;
+ int count = 0;
+ int chararr_count = 0;
+
+ in.readFully(bytearr, 0, utflen);
+
+ while (count < utflen) {
+ c = bytearr[count] & 0xff;
+ if (c > 127) {
+ break;
+ }
+ count++;
+ chararr[chararr_count++] = (char) c;
+ }
+
+ while (count < utflen) {
+ c = bytearr[count] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ /* 0xxxxxxx */
+ count++;
+ chararr[chararr_count++] = (char) c;
+ break;
+ case 12:
+ case 13:
+ /* 110x xxxx 10xx xxxx */
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException("malformed input: partial character at end");
+ }
+ char2 = bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException("malformed input around byte " + count);
+ }
+ chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException("malformed input: partial character at end");
+ }
+ char2 = bytearr[count - 2];
+ char3 = bytearr[count - 1];
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+ }
+ chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException("malformed input around byte " + count);
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararr_count);
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+
+ @Override
+ public boolean hasUnReadData() {
+ return null != byteBuffer && byteBuffer.hasRemaining();
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+
+/**
+ * write data to a output buffer
+ */
+public class ByteBufferDataWriter extends DataOutputStream {
+ private ByteBuffer buffer;
+ private final NativeDataTarget target;
+
+ private void checkSizeAndFlushNecessary(int length) throws IOException {
+ if (buffer.position() > 0 && buffer.remaining() < length) {
+ flush();
+ }
+ }
+
+ public ByteBufferDataWriter(NativeDataTarget handler) {
+ if (null != handler) {
+ this.buffer = handler.getOutputBuffer().getByteBuffer();
+ }
+ this.target = handler;
+ }
+
+ @Override
+ public synchronized void write(int v) throws IOException {
+ checkSizeAndFlushNecessary(1);
+ buffer.put((byte) v);
+ }
+
+ @Override
+ public boolean shortOfSpace(int dataLength) throws IOException {
+ if (buffer.remaining() < dataLength) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized void write(byte b[], int off, int len) throws IOException {
+ int remain = len;
+ int offset = off;
+ while (remain > 0) {
+ int currentFlush = 0;
+ if (buffer.remaining() > 0) {
+ currentFlush = Math.min(buffer.remaining(), remain);
+ buffer.put(b, offset, currentFlush);
+ remain -= currentFlush;
+ offset += currentFlush;
+ } else {
+ flush();
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ target.sendData();
+ buffer.position(0);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (hasUnFlushedData()) {
+ flush();
+ }
+ target.finishSendData();
+ }
+
+ private final static byte TRUE = (byte) 1;
+ private final static byte FALSE = (byte) 0;
+
+ @Override
+ public final void writeBoolean(boolean v) throws IOException {
+ checkSizeAndFlushNecessary(1);
+ buffer.put(v ? TRUE : FALSE);
+ }
+
+ @Override
+ public final void writeByte(int v) throws IOException {
+ checkSizeAndFlushNecessary(1);
+ buffer.put((byte) v);
+ }
+
+ @Override
+ public final void writeShort(int v) throws IOException {
+ checkSizeAndFlushNecessary(2);
+ buffer.putShort((short) v);
+ }
+
+ @Override
+ public final void writeChar(int v) throws IOException {
+ checkSizeAndFlushNecessary(2);
+ buffer.put((byte) ((v >>> 8) & 0xFF));
+ buffer.put((byte) ((v >>> 0) & 0xFF));
+ }
+
+ @Override
+ public final void writeInt(int v) throws IOException {
+ checkSizeAndFlushNecessary(4);
+ buffer.putInt(v);
+ }
+
+ @Override
+ public final void writeLong(long v) throws IOException {
+ checkSizeAndFlushNecessary(8);
+ buffer.putLong(v);
+ }
+
+ @Override
+ public final void writeFloat(float v) throws IOException {
+ checkSizeAndFlushNecessary(4);
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ @Override
+ public final void writeDouble(double v) throws IOException {
+ checkSizeAndFlushNecessary(8);
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ @Override
+ public final void writeBytes(String s) throws IOException {
+ final int len = s.length();
+
+ int remain = len;
+ int offset = 0;
+ while (remain > 0) {
+ int currentFlush = 0;
+ if (buffer.remaining() > 0) {
+ currentFlush = Math.min(buffer.remaining(), remain);
+
+ for (int i = 0; i < currentFlush; i++) {
+ buffer.put((byte) s.charAt(offset + i));
+ }
+
+ remain -= currentFlush;
+ offset += currentFlush;
+ } else {
+ flush();
+ }
+ }
+ }
+
+ @Override
+ public final void writeChars(String s) throws IOException {
+ final int len = s.length();
+
+ int remain = len;
+ int offset = 0;
+
+ while (remain > 0) {
+ int currentFlush = 0;
+ if (buffer.remaining() > 2) {
+ currentFlush = Math.min(buffer.remaining() / 2, remain);
+
+ for (int i = 0; i < currentFlush; i++) {
+ buffer.putChar(s.charAt(offset + i));
+ }
+
+ remain -= currentFlush;
+ offset += currentFlush;
+ } else {
+ flush();
+ }
+ }
+ }
+
+ @Override
+ public final void writeUTF(String str) throws IOException {
+ writeUTF(str, this);
+ }
+
+ private int writeUTF(String str, DataOutput out) throws IOException {
+ final int strlen = str.length();
+ int utflen = 0;
+ int c, count = 0;
+
+ /* use charAt instead of copying String to char array */
+ for (int i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+
+ if (utflen > 65535) {
+ throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes");
+ }
+
+ final byte[] bytearr = new byte[utflen + 2];
+ bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+ bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
+
+ int i = 0;
+ for (i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if (!((c >= 0x0001) && (c <= 0x007F))) {
+ break;
+ }
+ bytearr[count++] = (byte) c;
+ }
+
+ for (; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ bytearr[count++] = (byte) c;
+
+ } else if (c > 0x07FF) {
+ bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ } else {
+ bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ }
+ }
+ write(bytearr, 0, utflen + 2);
+ return utflen + 2;
+ }
+
+ @Override
+ public boolean hasUnFlushedData() {
+ return !(buffer.position() == 0);
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataInputStream.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.DataInput;
+import java.io.InputStream;
+
+public abstract class DataInputStream extends InputStream implements DataInput {
+ public abstract boolean hasUnReadData();
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DataOutputStream.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class DataOutputStream extends OutputStream implements DataOutput {
+
+ /**
+ * Check whether this buffer has enough space to store length of bytes
+ *
+ * @param length
+ * , length of bytes
+ * @return
+ * @throws IOException
+ */
+ public abstract boolean shortOfSpace(int length) throws IOException;
+
+ /**
+ * Check whether there is unflushed data stored in the stream
+ *
+ * @return
+ */
+ public abstract boolean hasUnFlushedData();
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/DirectBufferPool.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * as direct buffer memory is not collected by GC, we keep a pool
+ * to reuse direct buffers
+ */
+public class DirectBufferPool {
+
+ private static DirectBufferPool directBufferPool = null;
+ private static Log LOG = LogFactory.getLog(DirectBufferPool.class);
+ private ConcurrentMap<Integer, Queue<WeakReference<ByteBuffer>>> bufferMap = new ConcurrentHashMap<Integer, Queue<WeakReference<ByteBuffer>>>();
+
+ private DirectBufferPool() {
+ }
+
+ public static synchronized DirectBufferPool getInstance() {
+ if (null == directBufferPool) {
+ directBufferPool = new DirectBufferPool();
+ }
+ return directBufferPool;
+ }
+
+ public static void destoryInstance(){
+ directBufferPool = null;
+ }
+
+ public synchronized ByteBuffer borrowBuffer(int capacity) throws IOException {
+ Queue<WeakReference<ByteBuffer>> list = bufferMap.get(capacity);
+ if (null == list) {
+ return ByteBuffer.allocateDirect(capacity);
+ }
+ WeakReference<ByteBuffer> ref;
+ while ((ref = list.poll()) != null) {
+ ByteBuffer buf = ref.get();
+ if (buf != null) {
+ return buf;
+ }
+ }
+ return ByteBuffer.allocateDirect(capacity);
+ }
+
+ public void returnBuffer(ByteBuffer buffer) throws IOException {
+ if (null == buffer || !buffer.isDirect()) {
+ throw new IOException("the buffer is null or the buffer returned is not direct buffer");
+ }
+
+ buffer.clear();
+ int capacity = buffer.capacity();
+ Queue<WeakReference<ByteBuffer>> list = bufferMap.get(capacity);
+ if (null == list) {
+ list = new ConcurrentLinkedQueue<WeakReference<ByteBuffer>>();
+ Queue<WeakReference<ByteBuffer>> prev = bufferMap.putIfAbsent(capacity, list);
+ if (prev != null) {
+ list = prev;
+ }
+ }
+ list.add(new WeakReference<ByteBuffer>(buffer));
+ }
+
+ int getBufCountsForCapacity(int capacity) {
+ return bufferMap.get(capacity).size();
+ }
+
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class InputBuffer {
+
+ private ByteBuffer byteBuffer;
+ private final BufferType type;
+
+ public InputBuffer(BufferType type, int inputSize) throws IOException {
+
+ final int capacity = inputSize;
+ this.type = type;
+
+ if (capacity > 0) {
+
+ switch (type) {
+ case DIRECT_BUFFER:
+ this.byteBuffer = DirectBufferPool.getInstance().borrowBuffer(capacity);
+ this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+ break;
+ case HEAP_BUFFER:
+ this.byteBuffer = ByteBuffer.allocate(capacity);
+ this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+ break;
+ }
+ byteBuffer.position(0);
+ byteBuffer.limit(0);
+ }
+ }
+
+ public BufferType getType() {
+ return this.type;
+ }
+
+ public InputBuffer(byte[] bytes) {
+ this.type = BufferType.HEAP_BUFFER;
+ if (bytes.length > 0) {
+ this.byteBuffer = ByteBuffer.wrap(bytes);
+ this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+ byteBuffer.position(0);
+ byteBuffer.limit(0);
+ }
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return this.byteBuffer;
+ }
+
+ public int length() {
+ if (null == byteBuffer) {
+ return 0;
+ }
+ return byteBuffer.limit();
+ }
+
+ public void rewind(int startOffset, int length) {
+ if (null == byteBuffer) {
+ return;
+ }
+ byteBuffer.position(startOffset);
+ byteBuffer.limit(length);
+ }
+
+ public int remaining() {
+ if (null == byteBuffer) {
+ return 0;
+ }
+ return byteBuffer.remaining();
+ }
+
+ public int position() {
+ if (null == byteBuffer) {
+ return 0;
+ }
+ return byteBuffer.position();
+ }
+
+ public int position(int pos) {
+ if (null == byteBuffer) {
+ return 0;
+ }
+
+ byteBuffer.position(pos);
+ return pos;
+ }
+
+ public int capacity() {
+ if (null == byteBuffer) {
+ return 0;
+ }
+ return byteBuffer.capacity();
+ }
+
+ public byte[] array() {
+ if (null == byteBuffer) {
+ return null;
+ }
+ return byteBuffer.array();
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class OutputBuffer {
+ protected ByteBuffer byteBuffer;
+ private final BufferType type;
+
+ public OutputBuffer(BufferType type, int outputBufferCapacity) {
+
+ this.type = type;
+ if (outputBufferCapacity > 0) {
+ switch (type) {
+ case DIRECT_BUFFER:
+ this.byteBuffer = ByteBuffer.allocateDirect(outputBufferCapacity);
+ this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+ break;
+ case HEAP_BUFFER:
+ this.byteBuffer = ByteBuffer.allocate(outputBufferCapacity);
+ this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+ break;
+ }
+ }
+ }
+
+ public OutputBuffer(byte[] bytes) {
+ this.type = BufferType.HEAP_BUFFER;
+ final int outputBufferCapacity = bytes.length;
+ if (outputBufferCapacity > 0) {
+ this.byteBuffer = ByteBuffer.wrap(bytes);
+ this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+ this.byteBuffer.position(0);
+ }
+ }
+
+ public BufferType getType() {
+ return this.type;
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return this.byteBuffer;
+ }
+
+ public int length() {
+ return byteBuffer.position();
+ }
+
+ public void rewind() {
+ byteBuffer.position(0);
+ }
+
+ public int limit() {
+ return byteBuffer.limit();
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataWriter;
+import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
+import org.apache.hadoop.mapred.nativetask.serde.KVSerializer;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+
+/**
+ * load data into a buffer signaled by a {@link BufferPuller}
+ */
+public class BufferPullee<IK, IV> implements IDataLoader {
+
+ public static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH;
+
+ private final SizedWritable<IK> tmpInputKey;
+ private final SizedWritable<IV> tmpInputValue;
+ private boolean inputKVBufferd = false;
+ private RawKeyValueIterator rIter;
+ private ByteBufferDataWriter nativeWriter;
+ protected KVSerializer<IK, IV> serializer;
+ private final OutputBuffer outputBuffer;
+ private final NativeDataTarget target;
+ private boolean closed = false;
+
+ public BufferPullee(Class<IK> iKClass, Class<IV> iVClass, RawKeyValueIterator rIter, NativeDataTarget target)
+ throws IOException {
+ this.rIter = rIter;
+ tmpInputKey = new SizedWritable<IK>(iKClass);
+ tmpInputValue = new SizedWritable<IV>(iVClass);
+
+ if (null != iKClass && null != iVClass) {
+ this.serializer = new KVSerializer<IK, IV>(iKClass, iVClass);
+ }
+ this.outputBuffer = target.getOutputBuffer();
+ this.target = target;
+ }
+
+ @Override
+ public int load() throws IOException {
+ if (closed) {
+ return 0;
+ }
+
+ if (null == outputBuffer) {
+ throw new IOException("output buffer not set");
+ }
+
+ this.nativeWriter = new ByteBufferDataWriter(target);
+ outputBuffer.rewind();
+
+ int written = 0;
+ boolean firstKV = true;
+
+ if (inputKVBufferd) {
+ written += serializer.serializeKV(nativeWriter, tmpInputKey, tmpInputValue);
+ inputKVBufferd = false;
+ firstKV = false;
+ }
+
+ while (rIter.next()) {
+ inputKVBufferd = false;
+ tmpInputKey.readFields(rIter.getKey());
+ tmpInputValue.readFields(rIter.getValue());
+ serializer.updateLength(tmpInputKey, tmpInputValue);
+
+ final int kvSize = tmpInputKey.length + tmpInputValue.length + KV_HEADER_LENGTH;
+
+ if (!firstKV && nativeWriter.shortOfSpace(kvSize)) {
+ inputKVBufferd = true;
+ break;
+ } else {
+ written += serializer.serializeKV(nativeWriter, tmpInputKey, tmpInputValue);
+ firstKV = false;
+ }
+ }
+
+ if (nativeWriter.hasUnFlushedData()) {
+ nativeWriter.flush();
+ }
+ return written;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ if (null != rIter) {
+ rIter.close();
+ }
+ if (null != nativeWriter) {
+ nativeWriter.close();
+ }
+ closed = true;
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,187 @@
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.DataReceiver;
+import org.apache.hadoop.mapred.nativetask.NativeDataSource;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataReader;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.serde.SerializationFramework;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.apache.hadoop.util.Progress;
+
+/**
+ * actively signal a {@link BufferPullee} to load data into buffer and receive
+ */
+public class BufferPuller implements RawKeyValueIterator, DataReceiver {
+
+ private static Log LOG = LogFactory.getLog(BufferPuller.class);
+
+ public final static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH;
+
+ byte[] keyBytes = new byte[0];
+ byte[] valueBytes = new byte[0];
+
+ private InputBuffer inputBuffer;
+ private InputBuffer asideBuffer;
+
+ int remain = 0;
+
+ private ByteBufferDataReader nativeReader;
+
+ DataInputBuffer keyBuffer = new DataInputBuffer();
+ DataInputBuffer valueBuffer = new DataInputBuffer();
+
+ private boolean noMoreData = false;
+
+ private NativeDataSource input;
+ private boolean closed = false;
+
+ public BufferPuller(NativeDataSource handler) throws IOException {
+ this.input = handler;
+ this.inputBuffer = handler.getInputBuffer();
+ nativeReader = new ByteBufferDataReader(null);
+ this.asideBuffer = new InputBuffer(BufferType.HEAP_BUFFER, inputBuffer.capacity());
+ }
+
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ return keyBuffer;
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ return valueBuffer;
+ }
+
+ public void reset() {
+ noMoreData = false;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ if (closed) {
+ return false;
+ }
+
+ if (noMoreData) {
+ return false;
+ }
+ final int asideRemain = asideBuffer.remaining();
+ final int inputRemain = inputBuffer.remaining();
+
+ if (asideRemain == 0 && inputRemain == 0) {
+ input.loadData();
+ }
+
+ if (asideBuffer.remaining() > 0) {
+ return nextKeyValue(asideBuffer);
+ } else if (inputBuffer.remaining() > 0) {
+ return nextKeyValue(inputBuffer);
+ } else {
+ noMoreData = true;
+ return false;
+ }
+ }
+
+ private boolean nextKeyValue(InputBuffer buffer) throws IOException {
+ if (closed) {
+ return false;
+ }
+
+ nativeReader.reset(buffer);
+
+ final int keyLength = nativeReader.readInt();
+ if (keyBytes.length < keyLength) {
+ keyBytes = new byte[keyLength];
+ }
+
+ final int valueLength = nativeReader.readInt();
+ if (valueBytes.length < valueLength) {
+ valueBytes = new byte[valueLength];
+ }
+
+ nativeReader.read(keyBytes, 0, keyLength);
+ nativeReader.read(valueBytes, 0, valueLength);
+
+ keyBuffer.reset(keyBytes, keyLength);
+ valueBuffer.reset(valueBytes, valueLength);
+
+ return true;
+ }
+
+ @Override
+ public boolean receiveData() throws IOException {
+ if (closed) {
+ return false;
+ }
+
+ final ByteBuffer input = inputBuffer.getByteBuffer();
+
+ if (null != asideBuffer && asideBuffer.length() > 0) {
+ if (asideBuffer.remaining() > 0) {
+ final byte[] output = asideBuffer.getByteBuffer().array();
+ final int write = Math.min(asideBuffer.remaining(), input.remaining());
+ input.get(output, asideBuffer.position(), write);
+ asideBuffer.position(asideBuffer.position() + write);
+ }
+
+ if (asideBuffer.remaining() == 0) {
+ asideBuffer.position(0);
+ }
+ }
+
+ if (input.remaining() == 0) {
+ return true;
+ }
+
+ if (input.remaining() < KV_HEADER_LENGTH) {
+ throw new IOException("incomplete data, input length is: " + input.remaining());
+ }
+ final int position = input.position();
+ final int keyLength = input.getInt();
+ final int valueLength = input.getInt();
+ input.position(position);
+ final int kvLength = keyLength + valueLength + KV_HEADER_LENGTH;
+ final int remaining = input.remaining();
+
+ if (kvLength > remaining) {
+ if (null == asideBuffer || asideBuffer.capacity() < kvLength) {
+ asideBuffer = new InputBuffer(BufferType.HEAP_BUFFER, kvLength);
+ }
+ asideBuffer.rewind(0, kvLength);
+
+ input.get(asideBuffer.array(), 0, remaining);
+ asideBuffer.position(remaining);
+ }
+ return true;
+ }
+
+ @Override
+ public Progress getProgress() {
+ return null;
+ }
+
+ /**
+ * Closes the iterator so that the underlying streams can be closed.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ if (null != nativeReader) {
+ nativeReader.close();
+ }
+ closed = true;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataReader;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.serde.KVSerializer;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+
+/**
+ * collect data when signaled
+ */
+public class BufferPushee<OK, OV> implements Closeable {
+
+ private static Log LOG = LogFactory.getLog(BufferPushee.class);
+
+ public final static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH;
+
+ private InputBuffer asideBuffer;
+ private final SizedWritable<OK> tmpOutputKey;
+ private final SizedWritable<OV> tmpOutputValue;
+ private RecordWriter<OK, OV> writer;
+ private ByteBufferDataReader nativeReader;
+
+ private KVSerializer<OK, OV> deserializer;
+ private boolean closed = false;
+
+ public BufferPushee(Class<OK> oKClass, Class<OV> oVClass, RecordWriter<OK, OV> writer) throws IOException {
+ tmpOutputKey = new SizedWritable<OK>(oKClass);
+ tmpOutputValue = new SizedWritable<OV>(oVClass);
+
+ this.writer = writer;
+
+ if (null != oKClass && null != oVClass) {
+ this.deserializer = new KVSerializer<OK, OV>(oKClass, oVClass);
+ }
+ this.nativeReader = new ByteBufferDataReader(null);
+ }
+
+ public boolean collect(InputBuffer buffer) throws IOException {
+ if (closed) {
+ return false;
+ }
+
+ final ByteBuffer input = buffer.getByteBuffer();
+ if (null != asideBuffer && asideBuffer.length() > 0) {
+ if (asideBuffer.remaining() > 0) {
+ final byte[] output = asideBuffer.getByteBuffer().array();
+ final int write = Math.min(asideBuffer.remaining(), input.remaining());
+ input.get(output, asideBuffer.position(), write);
+ asideBuffer.position(asideBuffer.position() + write);
+ }
+
+ if (asideBuffer.remaining() == 0 && asideBuffer.position() > 0) {
+ asideBuffer.position(0);
+ write(asideBuffer);
+ asideBuffer.rewind(0, 0);
+ }
+ }
+
+ if (input.remaining() == 0) {
+ return true;
+ }
+
+ if (input.remaining() < KV_HEADER_LENGTH) {
+ throw new IOException("incomplete data, input length is: " + input.remaining());
+ }
+ final int position = input.position();
+ final int keyLength = input.getInt();
+ final int valueLength = input.getInt();
+ input.position(position);
+ final int kvLength = keyLength + valueLength + KV_HEADER_LENGTH;
+ final int remaining = input.remaining();
+
+ if (kvLength > remaining) {
+ if (null == asideBuffer || asideBuffer.capacity() < kvLength) {
+ asideBuffer = new InputBuffer(BufferType.HEAP_BUFFER, kvLength);
+ }
+ asideBuffer.rewind(0, kvLength);
+
+ input.get(asideBuffer.array(), 0, remaining);
+ asideBuffer.position(remaining);
+ } else {
+ write(buffer);
+ }
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ private boolean write(InputBuffer input) throws IOException {
+ if (closed) {
+ return false;
+ }
+ int totalRead = 0;
+ final int remain = input.remaining();
+ this.nativeReader.reset(input);
+ while (remain > totalRead) {
+ final int read = deserializer.deserializeKV(nativeReader, tmpOutputKey, tmpOutputValue);
+ if (read != 0) {
+ totalRead += read;
+ writer.write((OK) (tmpOutputKey.v), (OV) (tmpOutputValue.v));
+ }
+ }
+ if (remain != totalRead) {
+ throw new IOException("We expect to read " + remain + ", but we actually read: " + totalRead);
+ }
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ if (null != writer) {
+ writer.close(null);
+ }
+ if (null != nativeReader) {
+ nativeReader.close();
+ }
+ closed = true;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataWriter;
+import org.apache.hadoop.mapred.nativetask.serde.IKVSerializer;
+import org.apache.hadoop.mapred.nativetask.serde.KVSerializer;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+
+/**
+ * actively push data into a buffer and signal a {@link BufferPushee} to collect it
+ */
+public class BufferPusher<K, V> implements OutputCollector<K, V> {
+
+ private static Log LOG = LogFactory.getLog(BufferPusher.class);
+
+ private final SizedWritable<K> tmpInputKey;
+ private final SizedWritable<V> tmpInputValue;
+ private ByteBufferDataWriter out;
+ IKVSerializer serializer;
+ private boolean closed = false;
+
+ public BufferPusher(Class<K> iKClass, Class<V> iVClass, NativeDataTarget target) throws IOException {
+ tmpInputKey = new SizedWritable<K>(iKClass);
+ tmpInputValue = new SizedWritable<V>(iVClass);
+
+ if (null != iKClass && null != iVClass) {
+ this.serializer = new KVSerializer<K, V>(iKClass, iVClass);
+ }
+ this.out = new ByteBufferDataWriter(target);
+ }
+
+ public void collect(K key, V value, int partition) throws IOException {
+ tmpInputKey.reset(key);
+ tmpInputValue.reset(value);
+ serializer.serializePartitionKV(out, partition, tmpInputKey, tmpInputValue);
+ };
+
+ @Override
+ public void collect(K key, V value) throws IOException {
+ if (closed) {
+ return;
+ }
+ tmpInputKey.reset(key);
+ tmpInputValue.reset(value);
+ serializer.serializeKV(out, tmpInputKey, tmpInputValue);
+ };
+
+ public void flush() throws IOException {
+ if (null != out) {
+ if (out.hasUnFlushedData()) {
+ out.flush();
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ if (null != out) {
+ out.close();
+ }
+ closed = true;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import static org.apache.hadoop.mapred.Task.Counter.COMBINE_INPUT_RECORDS;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Task.CombinerRunner;
+import org.apache.hadoop.mapred.nativetask.Command;
+import org.apache.hadoop.mapred.nativetask.CommandDispatcher;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.DataChannel;
+import org.apache.hadoop.mapred.nativetask.ICombineHandler;
+import org.apache.hadoop.mapred.nativetask.INativeHandler;
+import org.apache.hadoop.mapred.nativetask.NativeBatchProcessor;
+import org.apache.hadoop.mapred.nativetask.TaskContext;
+import org.apache.hadoop.mapred.nativetask.serde.SerializationFramework;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+
+public class CombinerHandler<K, V> implements ICombineHandler, CommandDispatcher {
+
+ public static String NAME = "NativeTask.CombineHandler";
+ private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class);
+ public static Command LOAD = new Command(1, "Load");
+ public static Command COMBINE = new Command(4, "Combine");
+ public final CombinerRunner<K, V> combinerRunner;
+
+ private final INativeHandler nativeHandler;
+ private final BufferPuller puller;
+ private final BufferPusher<K, V> kvPusher;
+ private boolean closed = false;
+
+ public static <K, V> ICombineHandler create(TaskContext context) throws IOException, ClassNotFoundException {
+ final JobConf conf = new JobConf(context.getConf());
+ conf.set(Constants.SERIALIZATION_FRAMEWORK,
+ String.valueOf(SerializationFramework.WRITABLE_SERIALIZATION.getType()));
+ String combinerClazz = conf.get(Constants.MAPRED_COMBINER_CLASS);
+ if (null == combinerClazz) {
+ combinerClazz = conf.get(MRJobConfig.COMBINE_CLASS_ATTR);
+ }
+
+ if (null == combinerClazz) {
+ return null;
+ } else {
+ LOG.info("NativeTask Combiner is enabled, class = " + combinerClazz);
+ }
+
+ final Counter combineInputCounter = context.getTaskReporter().getCounter(COMBINE_INPUT_RECORDS);
+
+ final CombinerRunner<K, V> combinerRunner = CombinerRunner.create(conf, context.getTaskAttemptId(),
+ combineInputCounter, context.getTaskReporter(), null);
+
+ final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, conf, DataChannel.INOUT);
+ final BufferPusher<K, V> pusher = new BufferPusher<K, V>(context.getInputKeyClass(), context.getInputValueClass(),
+ nativeHandler);
+ final BufferPuller puller = new BufferPuller(nativeHandler);
+ return new CombinerHandler<K, V>(nativeHandler, combinerRunner, puller, pusher);
+ }
+
+ public CombinerHandler(INativeHandler nativeHandler, CombinerRunner<K, V> combiner, BufferPuller puller,
+ BufferPusher<K, V> kvPusher) throws IOException {
+ this.nativeHandler = nativeHandler;
+ this.combinerRunner = combiner;
+ this.puller = puller;
+ this.kvPusher = kvPusher;
+ nativeHandler.setCommandDispatcher(this);
+ nativeHandler.setDataReceiver(puller);
+ }
+
+ @Override
+ public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException {
+ if (null == command) {
+ return null;
+ }
+ if (command.equals(COMBINE)) {
+ combine();
+ }
+ return null;
+
+ }
+
+ @Override
+ public void combine() throws IOException{
+ try {
+ puller.reset();
+ combinerRunner.combine(puller, kvPusher);
+ kvPusher.flush();
+ return;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public long getId() {
+ return nativeHandler.getNativeHandler();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ if (closed) {
+ return;
+ }
+
+ if (null != puller) {
+ puller.close();
+ }
+
+ if (null != kvPusher) {
+ kvPusher.close();
+ }
+
+ if (null != nativeHandler) {
+ nativeHandler.close();
+ }
+ closed = true;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+/**
+ * an IDataLoader loads data on demand
+ */
+public interface IDataLoader {
+
+ /**
+ * @return size of data loaded
+ * @throws IOException
+ */
+ public int load() throws IOException;
+
+ public void close() throws IOException;
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.nativetask.Command;
+import org.apache.hadoop.mapred.nativetask.CommandDispatcher;
+import org.apache.hadoop.mapred.nativetask.DataChannel;
+import org.apache.hadoop.mapred.nativetask.ICombineHandler;
+import org.apache.hadoop.mapred.nativetask.INativeHandler;
+import org.apache.hadoop.mapred.nativetask.NativeBatchProcessor;
+import org.apache.hadoop.mapred.nativetask.TaskContext;
+import org.apache.hadoop.mapred.nativetask.util.NativeTaskOutput;
+import org.apache.hadoop.mapred.nativetask.util.OutputUtil;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+
+/**
+ * Java Record Reader + Java Mapper + Native Collector
+ */
+@SuppressWarnings("unchecked")
+public class NativeCollectorOnlyHandler<K, V> implements CommandDispatcher, Closeable {
+
+ public static String NAME = "NativeTask.MCollectorOutputHandler";
+ private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class);
+ public static Command GET_OUTPUT_PATH = new Command(100, "GET_OUTPUT_PATH");
+ public static Command GET_OUTPUT_INDEX_PATH = new Command(101, "GET_OUTPUT_INDEX_PATH");
+ public static Command GET_SPILL_PATH = new Command(102, "GET_SPILL_PATH");
+ public static Command GET_COMBINE_HANDLER = new Command(103, "GET_COMBINE_HANDLER");
+
+ private NativeTaskOutput output;
+ private int spillNumber = 0;
+ private ICombineHandler combinerHandler = null;
+ private final BufferPusher<K, V> kvPusher;
+ private final INativeHandler nativeHandler;
+ private boolean closed = false;
+
+ public static <K, V> NativeCollectorOnlyHandler<K, V> create(TaskContext context) throws IOException {
+
+
+ ICombineHandler combinerHandler = null;
+ try {
+ final TaskContext combineContext = context.copyOf();
+ combineContext.setInputKeyClass(context.getOuputKeyClass());
+ combineContext.setInputValueClass(context.getOutputValueClass());
+
+ combinerHandler = CombinerHandler.create(combineContext);
+ } catch (final ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+
+ if (null != combinerHandler) {
+ LOG.info("[NativeCollectorOnlyHandler] combiner is not null");
+ }
+
+ final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, context.getConf(), DataChannel.OUT);
+ final BufferPusher<K, V> kvPusher = new BufferPusher<K, V>(context.getOuputKeyClass(), context.getOutputValueClass(),
+ nativeHandler);
+
+ return new NativeCollectorOnlyHandler<K, V>(context, nativeHandler, kvPusher, combinerHandler);
+ }
+
+ protected NativeCollectorOnlyHandler(TaskContext context, INativeHandler nativeHandler,
+ BufferPusher<K, V> kvPusher, ICombineHandler combiner) throws IOException {
+ Configuration conf = context.getConf();
+ TaskAttemptID id = context.getTaskAttemptId();
+ if (null == id) {
+ this.output = OutputUtil.createNativeTaskOutput(conf, "");
+ } else {
+ this.output = OutputUtil.createNativeTaskOutput(context.getConf(), context.getTaskAttemptId()
+ .toString());
+ }
+ this.combinerHandler = combiner;
+ this.kvPusher = kvPusher;
+ this.nativeHandler = nativeHandler;
+ nativeHandler.setCommandDispatcher(this);
+ }
+
+ public void collect(K key, V value, int partition) throws IOException {
+ kvPusher.collect(key, value, partition);
+ };
+
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ if (null != kvPusher) {
+ kvPusher.close();
+ }
+
+ if (null != combinerHandler) {
+ combinerHandler.close();
+ }
+
+ if (null != nativeHandler) {
+ nativeHandler.close();
+ }
+ closed = true;
+ }
+
+ @Override
+ public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException {
+ Path p = null;
+ if (null == command) {
+ return null;
+ }
+
+ if (command.equals(GET_OUTPUT_PATH)) {
+ p = output.getOutputFileForWrite(-1);
+ } else if (command.equals(GET_OUTPUT_INDEX_PATH)) {
+ p = output.getOutputIndexFileForWrite(-1);
+ } else if (command.equals(GET_SPILL_PATH)) {
+ p = output.getSpillFileForWrite(spillNumber++, -1);
+
+ } else if (command.equals(GET_COMBINE_HANDLER)) {
+ if (null == combinerHandler) {
+ return null;
+ }
+ final ReadWriteBuffer result = new ReadWriteBuffer(8);
+
+ result.writeLong(combinerHandler.getId());
+ return result;
+ } else {
+ throw new IOException("Illegal command: " + command.toString());
+ }
+ if (p != null) {
+ final ReadWriteBuffer result = new ReadWriteBuffer();
+ result.writeString(p.toUri().getPath());
+ return result;
+ } else {
+ throw new IOException("MapOutputFile can't allocate spill/output file");
+ }
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class BoolWritableSerializer extends DefaultSerializer implements
+ INativeComparable {
+
+ @Override
+ public int getLength(Writable w) throws IOException {
+ return 1;
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class ByteWritableSerializer extends DefaultSerializer implements
+ INativeComparable {
+
+ @Override
+ public int getLength(Writable w) throws IOException {
+ return 1;
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class BytesWritableSerializer implements INativeComparable, INativeSerializer<BytesWritable> {
+
+ @Override
+ public int getLength(BytesWritable w) throws IOException {
+ return w.getLength();
+ }
+
+ @Override
+ public void serialize(BytesWritable w, DataOutput out) throws IOException {
+ out.write(w.getBytes(), 0, w.getLength());
+ }
+
+ @Override
+ public void deserialize(DataInput in, int length, BytesWritable w) throws IOException {
+ w.setSize(length);
+ in.readFully(w.getBytes(), 0, length);
+ }
+}
\ No newline at end of file