You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/24 10:24:06 UTC
svn commit: r1613036 - in
/hadoop/common/branches/MR-2841/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/
hadoop-mapreduce-client/hadoop-mapreduce-client-...
Author: todd
Date: Thu Jul 24 08:24:05 2014
New Revision: 1613036
URL: http://svn.apache.org/r1613036
Log:
MAPREDUCE-6000. native-task: Simplify ByteBufferDataReader/Writer. Contributed by Todd Lipcon.
Modified:
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java
hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt?rev=1613036&r1=1613035&r2=1613036&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt Thu Jul 24 08:24:05 2014
@@ -5,3 +5,4 @@ MAPREDUCE-5985. native-task: Fix build o
MAPREDUCE-5994. Simplify ByteUtils and fix failing test. (todd)
MAPREDUCE-5996. native-task: Rename system tests into standard directory layout (todd)
MAPREDUCE-5997. native-task: Use DirectBufferPool from Hadoop Common (todd)
+MAPREDUCE-6000. native-task: Simplify ByteBufferDataReader/Writer (todd)
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java?rev=1613036&r1=1613035&r2=1613036&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java Thu Jul 24 08:24:05 2014
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.mapred.nativetask.buffer;
+import com.google.common.base.Charsets;
+
import java.io.DataInput;
import java.io.EOFException;
import java.io.IOException;
@@ -31,11 +33,13 @@ import java.nio.ByteBuffer;
public class ByteBufferDataReader extends DataInputStream {
private ByteBuffer byteBuffer;
private char lineCache[];
+ private java.io.DataInputStream javaReader;
public ByteBufferDataReader(InputBuffer buffer) {
if (buffer != null) {
- this.byteBuffer = buffer.getByteBuffer();
+ reset(buffer);
}
+ javaReader = new java.io.DataInputStream(this);
}
public void reset(InputBuffer buffer) {
@@ -128,128 +132,12 @@ public class ByteBufferDataReader extend
@Override
public String readLine() throws IOException {
-
- InputStream in = this;
-
- char buf[] = lineCache;
-
- if (buf == null) {
- buf = lineCache = new char[128];
- }
-
- int room = buf.length;
- int offset = 0;
- int c;
-
- loop: while (true) {
- switch (c = in.read()) {
- case -1:
- case '\n':
- break loop;
-
- case '\r':
- final int c2 = in.read();
- if ((c2 != '\n') && (c2 != -1)) {
- if (!(in instanceof PushbackInputStream)) {
- in = new PushbackInputStream(in);
- }
- ((PushbackInputStream) in).unread(c2);
- }
- break loop;
-
- default:
- if (--room < 0) {
- buf = new char[offset + 128];
- room = buf.length - offset - 1;
- System.arraycopy(lineCache, 0, buf, 0, offset);
- lineCache = buf;
- }
- buf[offset++] = (char) c;
- break;
- }
- }
- if ((c == -1) && (offset == 0)) {
- return null;
- }
- return String.copyValueOf(buf, 0, offset);
+ return javaReader.readLine();
}
@Override
public final String readUTF() throws IOException {
- return readUTF(this);
- }
-
- private final static String readUTF(DataInput in) throws IOException {
- final int utflen = in.readUnsignedShort();
- byte[] bytearr = null;
- char[] chararr = null;
-
- bytearr = new byte[utflen];
- chararr = new char[utflen];
-
- int c, char2, char3;
- int count = 0;
- int chararr_count = 0;
-
- in.readFully(bytearr, 0, utflen);
-
- while (count < utflen) {
- c = bytearr[count] & 0xff;
- if (c > 127) {
- break;
- }
- count++;
- chararr[chararr_count++] = (char) c;
- }
-
- while (count < utflen) {
- c = bytearr[count] & 0xff;
- switch (c >> 4) {
- case 0:
- case 1:
- case 2:
- case 3:
- case 4:
- case 5:
- case 6:
- case 7:
- /* 0xxxxxxx */
- count++;
- chararr[chararr_count++] = (char) c;
- break;
- case 12:
- case 13:
- /* 110x xxxx 10xx xxxx */
- count += 2;
- if (count > utflen) {
- throw new UTFDataFormatException("malformed input: partial character at end");
- }
- char2 = bytearr[count - 1];
- if ((char2 & 0xC0) != 0x80) {
- throw new UTFDataFormatException("malformed input around byte " + count);
- }
- chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
- break;
- case 14:
- /* 1110 xxxx 10xx xxxx 10xx xxxx */
- count += 3;
- if (count > utflen) {
- throw new UTFDataFormatException("malformed input: partial character at end");
- }
- char2 = bytearr[count - 2];
- char3 = bytearr[count - 1];
- if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
- throw new UTFDataFormatException("malformed input around byte " + (count - 1));
- }
- chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
- break;
- default:
- /* 10xx xxxx, 1111 xxxx */
- throw new UTFDataFormatException("malformed input around byte " + count);
- }
- }
- // The number of chars produced may be less than utflen
- return new String(chararr, 0, chararr_count);
+ return javaReader.readUTF();
}
@Override
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java?rev=1613036&r1=1613035&r2=1613036&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java Thu Jul 24 08:24:05 2014
@@ -22,31 +22,42 @@ import java.io.IOException;
import java.io.UTFDataFormatException;
import java.nio.ByteBuffer;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedInteger;
+import com.google.common.primitives.UnsignedInts;
import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
/**
- * write data to a output buffer
+ * DataOutputStream implementation which buffers data in a fixed-size
+ * ByteBuffer.
+ * When the byte buffer has filled up, synchronously passes the buffer
+ * to a downstream NativeDataTarget.
*/
public class ByteBufferDataWriter extends DataOutputStream {
- private ByteBuffer buffer;
+ private final ByteBuffer buffer;
private final NativeDataTarget target;
- private void checkSizeAndFlushNecessary(int length) throws IOException {
+ private final static byte TRUE = (byte) 1;
+ private final static byte FALSE = (byte) 0;
+ private final java.io.DataOutputStream javaWriter;
+
+ private void checkSizeAndFlushIfNecessary(int length) throws IOException {
if (buffer.position() > 0 && buffer.remaining() < length) {
flush();
}
}
public ByteBufferDataWriter(NativeDataTarget handler) {
- if (null != handler) {
- this.buffer = handler.getOutputBuffer().getByteBuffer();
- }
+ Preconditions.checkNotNull(handler);
+ this.buffer = handler.getOutputBuffer().getByteBuffer();
this.target = handler;
+ this.javaWriter = new java.io.DataOutputStream(this);
}
@Override
public synchronized void write(int v) throws IOException {
- checkSizeAndFlushNecessary(1);
+ checkSizeAndFlushIfNecessary(1);
buffer.put((byte) v);
}
@@ -89,164 +100,72 @@ public class ByteBufferDataWriter extend
target.finishSendData();
}
- private final static byte TRUE = (byte) 1;
- private final static byte FALSE = (byte) 0;
-
@Override
public final void writeBoolean(boolean v) throws IOException {
- checkSizeAndFlushNecessary(1);
+ checkSizeAndFlushIfNecessary(1);
buffer.put(v ? TRUE : FALSE);
}
@Override
public final void writeByte(int v) throws IOException {
- checkSizeAndFlushNecessary(1);
+ checkSizeAndFlushIfNecessary(1);
buffer.put((byte) v);
}
@Override
public final void writeShort(int v) throws IOException {
- checkSizeAndFlushNecessary(2);
+ checkSizeAndFlushIfNecessary(2);
buffer.putShort((short) v);
}
@Override
public final void writeChar(int v) throws IOException {
- checkSizeAndFlushNecessary(2);
+ checkSizeAndFlushIfNecessary(2);
buffer.put((byte) ((v >>> 8) & 0xFF));
buffer.put((byte) ((v >>> 0) & 0xFF));
}
@Override
public final void writeInt(int v) throws IOException {
- checkSizeAndFlushNecessary(4);
+ checkSizeAndFlushIfNecessary(4);
buffer.putInt(v);
}
@Override
public final void writeLong(long v) throws IOException {
- checkSizeAndFlushNecessary(8);
+ checkSizeAndFlushIfNecessary(8);
buffer.putLong(v);
}
@Override
public final void writeFloat(float v) throws IOException {
- checkSizeAndFlushNecessary(4);
+ checkSizeAndFlushIfNecessary(4);
writeInt(Float.floatToIntBits(v));
}
@Override
public final void writeDouble(double v) throws IOException {
- checkSizeAndFlushNecessary(8);
+ checkSizeAndFlushIfNecessary(8);
writeLong(Double.doubleToLongBits(v));
}
@Override
public final void writeBytes(String s) throws IOException {
- final int len = s.length();
-
- int remain = len;
- int offset = 0;
- while (remain > 0) {
- int currentFlush = 0;
- if (buffer.remaining() > 0) {
- currentFlush = Math.min(buffer.remaining(), remain);
-
- for (int i = 0; i < currentFlush; i++) {
- buffer.put((byte) s.charAt(offset + i));
- }
-
- remain -= currentFlush;
- offset += currentFlush;
- } else {
- flush();
- }
- }
+ javaWriter.writeBytes(s);
}
@Override
public final void writeChars(String s) throws IOException {
- final int len = s.length();
-
- int remain = len;
- int offset = 0;
-
- while (remain > 0) {
- int currentFlush = 0;
- if (buffer.remaining() > 2) {
- currentFlush = Math.min(buffer.remaining() / 2, remain);
-
- for (int i = 0; i < currentFlush; i++) {
- buffer.putChar(s.charAt(offset + i));
- }
-
- remain -= currentFlush;
- offset += currentFlush;
- } else {
- flush();
- }
- }
+ javaWriter.writeChars(s);
}
@Override
public final void writeUTF(String str) throws IOException {
- writeUTF(str, this);
- }
-
- private int writeUTF(String str, DataOutput out) throws IOException {
- final int strlen = str.length();
- int utflen = 0;
- int c, count = 0;
-
- /* use charAt instead of copying String to char array */
- for (int i = 0; i < strlen; i++) {
- c = str.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F)) {
- utflen++;
- } else if (c > 0x07FF) {
- utflen += 3;
- } else {
- utflen += 2;
- }
- }
-
- if (utflen > 65535) {
- throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes");
- }
-
- final byte[] bytearr = new byte[utflen + 2];
- bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
- bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
-
- int i = 0;
- for (i = 0; i < strlen; i++) {
- c = str.charAt(i);
- if (!((c >= 0x0001) && (c <= 0x007F))) {
- break;
- }
- bytearr[count++] = (byte) c;
- }
-
- for (; i < strlen; i++) {
- c = str.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F)) {
- bytearr[count++] = (byte) c;
-
- } else if (c > 0x07FF) {
- bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
- bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
- bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
- } else {
- bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
- bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
- }
- }
- write(bytearr, 0, utflen + 2);
- return utflen + 2;
+ javaWriter.writeUTF(str);
}
@Override
public boolean hasUnFlushedData() {
- return !(buffer.position() == 0);
+ return buffer.position() > 0;
}
}
Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java?rev=1613036&r1=1613035&r2=1613036&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java Thu Jul 24 08:24:05 2014
@@ -17,20 +17,20 @@
*/
package org.apache.hadoop.mapred.nativetask.buffer;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
+import com.google.common.base.Charsets;
+import com.google.common.primitives.Shorts;
import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.mockito.Mockito;
-public class TestByteBufferReadWrite extends TestCase{
-
-
+public class TestByteBufferReadWrite extends TestCase {
public void testReadWrite() throws IOException {
byte[] buff = new byte[10000];
-
+
InputBuffer input = new InputBuffer(buff);
MockDataTarget target = new MockDataTarget(buff);
ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
@@ -48,7 +48,7 @@ public class TestByteBufferReadWrite ext
writer.writeBytes("goodboy");
writer.writeChars("hello");
writer.writeUTF("native task");
-
+
int length = target.getOutputBuffer().length();
input.rewind(0, length);
ByteBufferDataReader reader = new ByteBufferDataReader(input);
@@ -84,7 +84,29 @@ public class TestByteBufferReadWrite ext
Assert.assertEquals(0, input.remaining());
}
-
+
+ /**
+ * Test that Unicode characters outside the basic multilingual plane,
+ * such as this cat face, are properly encoded.
+ */
+ public void testCatFace() throws IOException {
+ byte[] buff = new byte[10];
+ MockDataTarget target = new MockDataTarget(buff);
+ ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
+ String catFace = "\uD83D\uDE38";
+ writer.writeUTF(catFace);
+
+ // Check that our own decoder can read it
+ InputBuffer input = new InputBuffer(buff);
+ input.rewind(0, buff.length);
+ ByteBufferDataReader reader = new ByteBufferDataReader(input);
+ assertEquals(catFace, reader.readUTF());
+
+ // Check that the standard Java one can read it too
+ String fromJava = new java.io.DataInputStream(new ByteArrayInputStream(buff)).readUTF();
+ assertEquals(catFace, fromJava);
+ }
+
public void testShortOfSpace() throws IOException {
byte[] buff = new byte[10];
MockDataTarget target = new MockDataTarget(buff);
@@ -100,20 +122,8 @@ public class TestByteBufferReadWrite ext
public void testFlush() throws IOException {
byte[] buff = new byte[10];
- final Counter flushCount = new Counter();
- final Flag finishFlag = new Flag();
- MockDataTarget target = new MockDataTarget(buff) {
- @Override
- public void sendData() throws IOException {
- flushCount.increase();
- }
-
- @Override
- public void finishSendData() throws IOException {
- finishFlag.set(true);
- }
- };
-
+ MockDataTarget target = Mockito.spy(new MockDataTarget(buff));
+
ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
Assert.assertEquals(false, writer.hasUnFlushedData());
@@ -121,10 +131,9 @@ public class TestByteBufferReadWrite ext
writer.write(new byte[100]);
Assert.assertEquals(true, writer.hasUnFlushedData());
- writer.close();
- Assert.assertEquals(11, flushCount.get());
- Assert.assertEquals(true, finishFlag.get());
-
+ writer.close();
+ Mockito.verify(target, Mockito.times(11)).sendData();
+ Mockito.verify(target).finishSendData();
}
private static String toString(byte[] str) throws UnsupportedEncodingException {
@@ -140,42 +149,14 @@ public class TestByteBufferReadWrite ext
}
@Override
- public void sendData() throws IOException {
-
- }
+ public void sendData() throws IOException {}
@Override
- public void finishSendData() throws IOException {
-
- }
+ public void finishSendData() throws IOException {}
@Override
public OutputBuffer getOutputBuffer() {
return out;
}
}
-
- private static class Counter {
- private int count;
-
- public int get() {
- return count;
- }
-
- public void increase() {
- count++;
- }
- }
-
- private static class Flag {
- private boolean value;
-
- public void set(boolean status) {
- this.value = status;
- }
-
- public boolean get() {
- return this.value;
- }
- }
}