You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/24 10:24:06 UTC

svn commit: r1613036 - in /hadoop/common/branches/MR-2841/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ hadoop-mapreduce-client/hadoop-mapreduce-client-...

Author: todd
Date: Thu Jul 24 08:24:05 2014
New Revision: 1613036

URL: http://svn.apache.org/r1613036
Log:
MAPREDUCE-6000. native-task: Simplify ByteBufferDataReader/Writer. Contributed by Todd Lipcon.

Modified:
    hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
    hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java
    hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java
    hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java

Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt?rev=1613036&r1=1613035&r2=1613036&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt Thu Jul 24 08:24:05 2014
@@ -5,3 +5,4 @@ MAPREDUCE-5985. native-task: Fix build o
 MAPREDUCE-5994. Simplify ByteUtils and fix failing test. (todd)
 MAPREDUCE-5996. native-task: Rename system tests into standard directory layout (todd)
 MAPREDUCE-5997. native-task: Use DirectBufferPool from Hadoop Common (todd)
+MAPREDUCE-6000. native-task: Simplify ByteBufferDataReader/Writer (todd)

Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java?rev=1613036&r1=1613035&r2=1613036&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataReader.java Thu Jul 24 08:24:05 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.mapred.nativetask.buffer;
 
+import com.google.common.base.Charsets;
+
 import java.io.DataInput;
 import java.io.EOFException;
 import java.io.IOException;
@@ -31,11 +33,13 @@ import java.nio.ByteBuffer;
 public class ByteBufferDataReader extends DataInputStream {
   private ByteBuffer byteBuffer;
   private char lineCache[];
+  private java.io.DataInputStream javaReader;
 
   public ByteBufferDataReader(InputBuffer buffer) {
     if (buffer != null) {
-      this.byteBuffer = buffer.getByteBuffer();
+      reset(buffer);
     }
+    javaReader = new java.io.DataInputStream(this);
   }
 
   public void reset(InputBuffer buffer) {
@@ -128,128 +132,12 @@ public class ByteBufferDataReader extend
 
   @Override
   public String readLine() throws IOException {
-
-    InputStream in = this;
-
-    char buf[] = lineCache;
-
-    if (buf == null) {
-      buf = lineCache = new char[128];
-    }
-
-    int room = buf.length;
-    int offset = 0;
-    int c;
-
-    loop: while (true) {
-      switch (c = in.read()) {
-      case -1:
-      case '\n':
-        break loop;
-
-      case '\r':
-        final int c2 = in.read();
-        if ((c2 != '\n') && (c2 != -1)) {
-          if (!(in instanceof PushbackInputStream)) {
-            in = new PushbackInputStream(in);
-          }
-          ((PushbackInputStream) in).unread(c2);
-        }
-        break loop;
-
-      default:
-        if (--room < 0) {
-          buf = new char[offset + 128];
-          room = buf.length - offset - 1;
-          System.arraycopy(lineCache, 0, buf, 0, offset);
-          lineCache = buf;
-        }
-        buf[offset++] = (char) c;
-        break;
-      }
-    }
-    if ((c == -1) && (offset == 0)) {
-      return null;
-    }
-    return String.copyValueOf(buf, 0, offset);
+    return javaReader.readLine();
   }
 
   @Override
   public final String readUTF() throws IOException {
-    return readUTF(this);
-  }
-
-  private final static String readUTF(DataInput in) throws IOException {
-    final int utflen = in.readUnsignedShort();
-    byte[] bytearr = null;
-    char[] chararr = null;
-
-    bytearr = new byte[utflen];
-    chararr = new char[utflen];
-
-    int c, char2, char3;
-    int count = 0;
-    int chararr_count = 0;
-
-    in.readFully(bytearr, 0, utflen);
-
-    while (count < utflen) {
-      c = bytearr[count] & 0xff;
-      if (c > 127) {
-        break;
-      }
-      count++;
-      chararr[chararr_count++] = (char) c;
-    }
-
-    while (count < utflen) {
-      c = bytearr[count] & 0xff;
-      switch (c >> 4) {
-      case 0:
-      case 1:
-      case 2:
-      case 3:
-      case 4:
-      case 5:
-      case 6:
-      case 7:
-        /* 0xxxxxxx */
-        count++;
-        chararr[chararr_count++] = (char) c;
-        break;
-      case 12:
-      case 13:
-        /* 110x xxxx 10xx xxxx */
-        count += 2;
-        if (count > utflen) {
-          throw new UTFDataFormatException("malformed input: partial character at end");
-        }
-        char2 = bytearr[count - 1];
-        if ((char2 & 0xC0) != 0x80) {
-          throw new UTFDataFormatException("malformed input around byte " + count);
-        }
-        chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
-        break;
-      case 14:
-        /* 1110 xxxx 10xx xxxx 10xx xxxx */
-        count += 3;
-        if (count > utflen) {
-          throw new UTFDataFormatException("malformed input: partial character at end");
-        }
-        char2 = bytearr[count - 2];
-        char3 = bytearr[count - 1];
-        if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
-          throw new UTFDataFormatException("malformed input around byte " + (count - 1));
-        }
-        chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
-        break;
-      default:
-        /* 10xx xxxx, 1111 xxxx */
-        throw new UTFDataFormatException("malformed input around byte " + count);
-      }
-    }
-    // The number of chars produced may be less than utflen
-    return new String(chararr, 0, chararr_count);
+    return javaReader.readUTF();
   }
 
   @Override

Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java?rev=1613036&r1=1613035&r2=1613036&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/ByteBufferDataWriter.java Thu Jul 24 08:24:05 2014
@@ -22,31 +22,42 @@ import java.io.IOException;
 import java.io.UTFDataFormatException;
 import java.nio.ByteBuffer;
 
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedInteger;
+import com.google.common.primitives.UnsignedInts;
 import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
 
 /**
- * write data to a output buffer
+ * DataOutputStream implementation which buffers data in a fixed-size
+ * ByteBuffer.
+ * When the byte buffer has filled up, synchronously passes the buffer
+ * to a downstream NativeDataTarget.
  */
 public class ByteBufferDataWriter extends DataOutputStream {
-  private ByteBuffer buffer;
+  private final ByteBuffer buffer;
   private final NativeDataTarget target;
 
-  private void checkSizeAndFlushNecessary(int length) throws IOException {
+  private final static byte TRUE = (byte) 1;
+  private final static byte FALSE = (byte) 0;
+  private final java.io.DataOutputStream javaWriter;
+
+  private void checkSizeAndFlushIfNecessary(int length) throws IOException {
     if (buffer.position() > 0 && buffer.remaining() < length) {
       flush();
     }
   }
 
   public ByteBufferDataWriter(NativeDataTarget handler) {
-    if (null != handler) {
-      this.buffer = handler.getOutputBuffer().getByteBuffer();
-    }
+    Preconditions.checkNotNull(handler);
+    this.buffer = handler.getOutputBuffer().getByteBuffer();
     this.target = handler;
+    this.javaWriter = new java.io.DataOutputStream(this);
   }
 
   @Override
   public synchronized void write(int v) throws IOException {
-    checkSizeAndFlushNecessary(1);
+    checkSizeAndFlushIfNecessary(1);
     buffer.put((byte) v);
   }
 
@@ -89,164 +100,72 @@ public class ByteBufferDataWriter extend
     target.finishSendData();
   }
 
-  private final static byte TRUE = (byte) 1;
-  private final static byte FALSE = (byte) 0;
-
   @Override
   public final void writeBoolean(boolean v) throws IOException {
-    checkSizeAndFlushNecessary(1);
+    checkSizeAndFlushIfNecessary(1);
     buffer.put(v ? TRUE : FALSE);
   }
 
   @Override
   public final void writeByte(int v) throws IOException {
-    checkSizeAndFlushNecessary(1);
+    checkSizeAndFlushIfNecessary(1);
     buffer.put((byte) v);
   }
 
   @Override
   public final void writeShort(int v) throws IOException {
-    checkSizeAndFlushNecessary(2);
+    checkSizeAndFlushIfNecessary(2);
     buffer.putShort((short) v);
   }
 
   @Override
   public final void writeChar(int v) throws IOException {
-    checkSizeAndFlushNecessary(2);
+    checkSizeAndFlushIfNecessary(2);
     buffer.put((byte) ((v >>> 8) & 0xFF));
     buffer.put((byte) ((v >>> 0) & 0xFF));
   }
 
   @Override
   public final void writeInt(int v) throws IOException {
-    checkSizeAndFlushNecessary(4);
+    checkSizeAndFlushIfNecessary(4);
     buffer.putInt(v);
   }
 
   @Override
   public final void writeLong(long v) throws IOException {
-    checkSizeAndFlushNecessary(8);
+    checkSizeAndFlushIfNecessary(8);
     buffer.putLong(v);
   }
 
   @Override
   public final void writeFloat(float v) throws IOException {
-    checkSizeAndFlushNecessary(4);
+    checkSizeAndFlushIfNecessary(4);
     writeInt(Float.floatToIntBits(v));
   }
 
   @Override
   public final void writeDouble(double v) throws IOException {
-    checkSizeAndFlushNecessary(8);
+    checkSizeAndFlushIfNecessary(8);
     writeLong(Double.doubleToLongBits(v));
   }
 
   @Override
   public final void writeBytes(String s) throws IOException {
-    final int len = s.length();
-
-    int remain = len;
-    int offset = 0;
-    while (remain > 0) {
-      int currentFlush = 0;
-      if (buffer.remaining() > 0) {
-        currentFlush = Math.min(buffer.remaining(), remain);
-
-        for (int i = 0; i < currentFlush; i++) {
-          buffer.put((byte) s.charAt(offset + i));
-        }
-
-        remain -= currentFlush;
-        offset += currentFlush;
-      } else {
-        flush();
-      }
-    }
+    javaWriter.writeBytes(s);
   }
 
   @Override
   public final void writeChars(String s) throws IOException {
-    final int len = s.length();
-
-    int remain = len;
-    int offset = 0;
-
-    while (remain > 0) {
-      int currentFlush = 0;
-      if (buffer.remaining() > 2) {
-        currentFlush = Math.min(buffer.remaining() / 2, remain);
-
-        for (int i = 0; i < currentFlush; i++) {
-          buffer.putChar(s.charAt(offset + i));
-        }
-
-        remain -= currentFlush;
-        offset += currentFlush;
-      } else {
-        flush();
-      }
-    }
+    javaWriter.writeChars(s);
   }
 
   @Override
   public final void writeUTF(String str) throws IOException {
-    writeUTF(str, this);
-  }
-
-  private int writeUTF(String str, DataOutput out) throws IOException {
-    final int strlen = str.length();
-    int utflen = 0;
-    int c, count = 0;
-
-    /* use charAt instead of copying String to char array */
-    for (int i = 0; i < strlen; i++) {
-      c = str.charAt(i);
-      if ((c >= 0x0001) && (c <= 0x007F)) {
-        utflen++;
-      } else if (c > 0x07FF) {
-        utflen += 3;
-      } else {
-        utflen += 2;
-      }
-    }
-
-    if (utflen > 65535) {
-      throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes");
-    }
-
-    final byte[] bytearr = new byte[utflen + 2];
-    bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
-    bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
-
-    int i = 0;
-    for (i = 0; i < strlen; i++) {
-      c = str.charAt(i);
-      if (!((c >= 0x0001) && (c <= 0x007F))) {
-        break;
-      }
-      bytearr[count++] = (byte) c;
-    }
-
-    for (; i < strlen; i++) {
-      c = str.charAt(i);
-      if ((c >= 0x0001) && (c <= 0x007F)) {
-        bytearr[count++] = (byte) c;
-
-      } else if (c > 0x07FF) {
-        bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
-        bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
-        bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
-      } else {
-        bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
-        bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
-      }
-    }
-    write(bytearr, 0, utflen + 2);
-    return utflen + 2;
+    javaWriter.writeUTF(str);
   }
 
   @Override
   public boolean hasUnFlushedData() {
-    return !(buffer.position() == 0);
+    return buffer.position() > 0;
   }
 }

Modified: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java?rev=1613036&r1=1613035&r2=1613036&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/buffer/TestByteBufferReadWrite.java Thu Jul 24 08:24:05 2014
@@ -17,20 +17,20 @@
  */
 package org.apache.hadoop.mapred.nativetask.buffer;
 
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
 
+import com.google.common.base.Charsets;
+import com.google.common.primitives.Shorts;
 import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
+import org.mockito.Mockito;
 
-public class TestByteBufferReadWrite extends TestCase{
-  
-  
+public class TestByteBufferReadWrite extends TestCase {
   public void testReadWrite() throws IOException {
     byte[] buff = new byte[10000];
-    
+
     InputBuffer input = new InputBuffer(buff);
     MockDataTarget target = new MockDataTarget(buff);
     ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
@@ -48,7 +48,7 @@ public class TestByteBufferReadWrite ext
     writer.writeBytes("goodboy");
     writer.writeChars("hello");
     writer.writeUTF("native task");
-    
+
     int length = target.getOutputBuffer().length();
     input.rewind(0, length);
     ByteBufferDataReader reader = new ByteBufferDataReader(input);
@@ -84,7 +84,29 @@ public class TestByteBufferReadWrite ext
     
     Assert.assertEquals(0, input.remaining());
   }
-  
+
+  /**
+   * Test that Unicode characters outside the basic multilingual plane,
+   * such as this cat face, are properly encoded.
+   */
+  public void testCatFace() throws IOException {
+    byte[] buff = new byte[10];
+    MockDataTarget target = new MockDataTarget(buff);
+    ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
+    String catFace = "\uD83D\uDE38";
+    writer.writeUTF(catFace);
+
+    // Check that our own decoder can read it
+    InputBuffer input = new InputBuffer(buff);
+    input.rewind(0, buff.length);
+    ByteBufferDataReader reader = new ByteBufferDataReader(input);
+    assertEquals(catFace, reader.readUTF());
+
+    // Check that the standard Java one can read it too
+    String fromJava = new java.io.DataInputStream(new ByteArrayInputStream(buff)).readUTF();
+    assertEquals(catFace, fromJava);
+  }
+
   public void testShortOfSpace() throws IOException {
     byte[] buff = new byte[10];
     MockDataTarget target = new MockDataTarget(buff);
@@ -100,20 +122,8 @@ public class TestByteBufferReadWrite ext
 
   public void testFlush() throws IOException {
     byte[] buff = new byte[10];
-    final Counter flushCount = new Counter();
-    final Flag finishFlag = new Flag();
-    MockDataTarget target = new MockDataTarget(buff) {
-      @Override
-      public void sendData() throws IOException {
-        flushCount.increase();
-      }
-      
-      @Override
-      public void finishSendData() throws IOException {
-        finishFlag.set(true);
-      }
-    };
-    
+    MockDataTarget target = Mockito.spy(new MockDataTarget(buff));
+
     ByteBufferDataWriter writer = new ByteBufferDataWriter(target);
     Assert.assertEquals(false, writer.hasUnFlushedData()); 
     
@@ -121,10 +131,9 @@ public class TestByteBufferReadWrite ext
     writer.write(new byte[100]);
 
     Assert.assertEquals(true, writer.hasUnFlushedData()); 
-    writer.close();    
-    Assert.assertEquals(11, flushCount.get());
-    Assert.assertEquals(true, finishFlag.get()); 
-
+    writer.close();
+    Mockito.verify(target, Mockito.times(11)).sendData();
+    Mockito.verify(target).finishSendData();
   }
   
   private static String toString(byte[] str) throws UnsupportedEncodingException {
@@ -140,42 +149,14 @@ public class TestByteBufferReadWrite ext
     }
     
     @Override
-    public void sendData() throws IOException {
-      
-    }
+    public void sendData() throws IOException {}
 
     @Override
-    public void finishSendData() throws IOException {
-       
-    }
+    public void finishSendData() throws IOException {}
 
     @Override
     public OutputBuffer getOutputBuffer() {
       return out;
     }    
   }
-  
-  private static class Counter {
-    private int count;
-    
-    public int get() {
-      return count;
-    }
-    
-    public void increase() {
-      count++;
-    }
-  }
-  
-  private static class Flag {
-    private boolean value;
-    
-    public void set(boolean status) {
-      this.value = status;
-    }
-    
-    public boolean get() {
-      return this.value;
-    }
-  }
 }