You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 05:41:07 UTC
svn commit: r1079163 - in /hadoop/common/branches/yahoo-merge/src:
java/org/apache/hadoop/io/DataInputByteBuffer.java
java/org/apache/hadoop/io/DataOutputByteBuffer.java
test/core/org/apache/hadoop/io/TestDataByteBuffers.java
Author: omalley
Date: Tue Mar 8 04:41:07 2011
New Revision: 1079163
URL: http://svn.apache.org/viewvc?rev=1079163&view=rev
Log:
commit 595594756257beb8d4d3349f2117d677389f3c6f
Author: Arun C Murthy <ac...@apache.org>
Date: Sun Jan 23 00:09:31 2011 -0800
. Added Data(In,Out)putByteBuffer to work with ByteBuffer similar to Data(In,Out)putBuffer for byte[]. Contributed by Chris Douglas.
+++ b/YAHOO-CHANGES.txt
+ . Added Data(In,Out)putByteBuffer to work with ByteBuffer
+ similar to Data(In,Out)putBuffer for byte[]. (cdouglas via acmurthy)
+
Added:
hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/io/DataInputByteBuffer.java
hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/io/DataOutputByteBuffer.java
hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/io/TestDataByteBuffers.java
Added: hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/io/DataInputByteBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/io/DataInputByteBuffer.java?rev=1079163&view=auto
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/io/DataInputByteBuffer.java (added)
+++ hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/io/DataInputByteBuffer.java Tue Mar 8 04:41:07 2011
@@ -0,0 +1,82 @@
+package org.apache.hadoop.io;
+
+import java.io.DataInputStream;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+public class DataInputByteBuffer extends DataInputStream {
+
+ private static class Buffer extends InputStream {
+ private final byte[] scratch = new byte[1];
+ ByteBuffer[] buffers = new ByteBuffer[0];
+ int bidx, pos, length;
+ @Override
+ public int read() {
+ if (-1 == read(scratch, 0, 1)) {
+ return -1;
+ }
+ return scratch[0] & 0xFF;
+ }
+ @Override
+ public int read(byte[] b, int off, int len) {
+ if (bidx >= buffers.length) {
+ return -1;
+ }
+ int cur = 0;
+ do {
+ int rem = Math.min(len, buffers[bidx].remaining());
+ buffers[bidx].get(b, off, rem);
+ cur += rem;
+ off += rem;
+ len -= rem;
+ } while (len > 0 && ++bidx < buffers.length);
+ pos += cur;
+ return cur;
+ }
+ public void reset(ByteBuffer[] buffers) {
+ bidx = pos = length = 0;
+ this.buffers = buffers;
+ for (ByteBuffer b : buffers) {
+ length += b.remaining();
+ }
+ }
+ public int getPosition() {
+ return pos;
+ }
+ public int getLength() {
+ return length;
+ }
+ public ByteBuffer[] getData() {
+ return buffers;
+ }
+ }
+
+ private Buffer buffers;
+
+ public DataInputByteBuffer() {
+ this(new Buffer());
+ }
+
+ private DataInputByteBuffer(Buffer buffers) {
+ super(buffers);
+ this.buffers = buffers;
+ }
+
+ public void reset(ByteBuffer... input) {
+ buffers.reset(input);
+ }
+
+ public ByteBuffer[] getData() {
+ return buffers.getData();
+ }
+
+ public int getPosition() {
+ return buffers.getPosition();
+ }
+
+ public int getLength() {
+ return buffers.getLength();
+ }
+}
Added: hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/io/DataOutputByteBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/io/DataOutputByteBuffer.java?rev=1079163&view=auto
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/io/DataOutputByteBuffer.java (added)
+++ hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/io/DataOutputByteBuffer.java Tue Mar 8 04:41:07 2011
@@ -0,0 +1,119 @@
+package org.apache.hadoop.io;
+
+import java.io.DataOutputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.LinkedList;
+
+public class DataOutputByteBuffer extends DataOutputStream {
+
+ static class Buffer extends OutputStream {
+
+ final byte[] b = new byte[1];
+ final boolean direct;
+ final List<ByteBuffer> active = new ArrayList<ByteBuffer>();
+ final List<ByteBuffer> inactive = new LinkedList<ByteBuffer>();
+ int size;
+ int length;
+ ByteBuffer current;
+
+ Buffer(int size, boolean direct) {
+ this.direct = direct;
+ this.size = size;
+ current = direct
+ ? ByteBuffer.allocateDirect(size)
+ : ByteBuffer.allocate(size);
+ }
+ @Override
+ public void write(int b) {
+ this.b[0] = (byte)(b & 0xFF);
+ write(this.b);
+ }
+ @Override
+ public void write(byte[] b) {
+ write(b, 0, b.length);
+ }
+ @Override
+ public void write(byte[] b, int off, int len) {
+ int rem = current.remaining();
+ while (len > rem) {
+ current.put(b, off, rem);
+ length += rem;
+ current.flip();
+ active.add(current);
+ off += rem;
+ len -= rem;
+ rem = getBuffer(len);
+ }
+ current.put(b, off, len);
+ length += len;
+ }
+ int getBuffer(int newsize) {
+ if (inactive.isEmpty()) {
+ size = Math.max(size << 1, newsize);
+ current = direct
+ ? ByteBuffer.allocateDirect(size)
+ : ByteBuffer.allocate(size);
+ } else {
+ current = inactive.remove(0);
+ }
+ return current.remaining();
+ }
+ ByteBuffer[] getData() {
+ ByteBuffer[] ret = active.toArray(new ByteBuffer[active.size() + 1]);
+ ByteBuffer tmp = current.duplicate();
+ tmp.flip();
+ ret[ret.length - 1] = tmp.slice();
+ return ret;
+ }
+ int getLength() {
+ return length;
+ }
+ void reset() {
+ length = 0;
+ current.rewind();
+ inactive.add(0, current);
+ for (int i = active.size() - 1; i >= 0; --i) {
+ ByteBuffer b = active.remove(i);
+ b.rewind();
+ inactive.add(0, b);
+ }
+ current = inactive.remove(0);
+ }
+ }
+
+ private final Buffer buffers;
+
+ public DataOutputByteBuffer() {
+ this(32);
+ }
+
+ public DataOutputByteBuffer(int size) {
+ this(size, false);
+ }
+
+ public DataOutputByteBuffer(int size, boolean direct) {
+ this(new Buffer(size, direct));
+ }
+
+ private DataOutputByteBuffer(Buffer buffers) {
+ super(buffers);
+ this.buffers = buffers;
+ }
+
+ public ByteBuffer[] getData() {
+ return buffers.getData();
+ }
+
+ public int getLength() {
+ return buffers.getLength();
+ }
+
+ public void reset() {
+ this.written = 0;
+ buffers.reset();
+ }
+
+}
Added: hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/io/TestDataByteBuffers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/io/TestDataByteBuffers.java?rev=1079163&view=auto
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/io/TestDataByteBuffers.java (added)
+++ hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/io/TestDataByteBuffers.java Tue Mar 8 04:41:07 2011
@@ -0,0 +1,177 @@
+package org.apache.hadoop.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestDataByteBuffers {
+
+ private static void readJunk(DataInput in, Random r, long seed, int iter)
+ throws IOException {
+ r.setSeed(seed);
+ for (int i = 0; i < iter; ++i) {
+ switch (r.nextInt(7)) {
+ case 0:
+ assertEquals((byte)(r.nextInt() & 0xFF), in.readByte()); break;
+ case 1:
+ assertEquals((short)(r.nextInt() & 0xFFFF), in.readShort()); break;
+ case 2:
+ assertEquals(r.nextInt(), in.readInt()); break;
+ case 3:
+ assertEquals(r.nextLong(), in.readLong()); break;
+ case 4:
+ assertEquals(Double.doubleToLongBits(r.nextDouble()),
+ Double.doubleToLongBits(in.readDouble())); break;
+ case 5:
+ assertEquals(Float.floatToIntBits(r.nextFloat()),
+ Float.floatToIntBits(in.readFloat())); break;
+ case 6:
+ int len = r.nextInt(1024);
+ byte[] vb = new byte[len];
+ r.nextBytes(vb);
+ byte[] b = new byte[len];
+ in.readFully(b, 0, len);
+ assertArrayEquals(vb, b);
+ break;
+ }
+ }
+ }
+
+ private static void writeJunk(DataOutput out, Random r, long seed, int iter)
+ throws IOException {
+ r.setSeed(seed);
+ for (int i = 0; i < iter; ++i) {
+ switch (r.nextInt(7)) {
+ case 0: out.writeByte(r.nextInt()); break;
+ case 1: out.writeShort((short)(r.nextInt() & 0xFFFF)); break;
+ case 2: out.writeInt(r.nextInt()); break;
+ case 3: out.writeLong(r.nextLong()); break;
+ case 4: out.writeDouble(r.nextDouble()); break;
+ case 5: out.writeFloat(r.nextFloat()); break;
+ case 6:
+ byte[] b = new byte[r.nextInt(1024)];
+ r.nextBytes(b);
+ out.write(b);
+ break;
+ }
+ }
+ }
+
+ @Test
+ public void testBaseBuffers() throws IOException {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ writeJunk(dob, r, seed, 1000);
+ DataInputBuffer dib = new DataInputBuffer();
+ dib.reset(dob.getData(), 0, dob.getLength());
+ readJunk(dib, r, seed, 1000);
+
+ dob.reset();
+ writeJunk(dob, r, seed, 1000);
+ dib.reset(dob.getData(), 0, dob.getLength());
+ readJunk(dib, r, seed, 1000);
+ }
+
+ @Test
+ public void testByteBuffers() throws IOException {
+ DataOutputByteBuffer dob = new DataOutputByteBuffer();
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ writeJunk(dob, r, seed, 1000);
+ DataInputByteBuffer dib = new DataInputByteBuffer();
+ dib.reset(dob.getData());
+ readJunk(dib, r, seed, 1000);
+
+ dob.reset();
+ writeJunk(dob, r, seed, 1000);
+ dib.reset(dob.getData());
+ readJunk(dib, r, seed, 1000);
+ }
+
+ private static byte[] toBytes(ByteBuffer[] bufs, int len) {
+ byte[] ret = new byte[len];
+ int pos = 0;
+ for (int i = 0; i < bufs.length; ++i) {
+ int rem = bufs[i].remaining();
+ bufs[i].get(ret, pos, rem);
+ pos += rem;
+ }
+ return ret;
+ }
+
+ @Test
+ public void testDataOutputByteBufferCompatibility() throws IOException {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ DataOutputByteBuffer dobb = new DataOutputByteBuffer();
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ writeJunk(dob, r, seed, 1000);
+ writeJunk(dobb, r, seed, 1000);
+ byte[] check = toBytes(dobb.getData(), dobb.getLength());
+ assertEquals(dob.getLength(), check.length);
+ assertArrayEquals(Arrays.copyOf(dob.getData(), dob.getLength()), check);
+
+ dob.reset();
+ dobb.reset();
+ writeJunk(dob, r, seed, 3000);
+ writeJunk(dobb, r, seed, 3000);
+ check = toBytes(dobb.getData(), dobb.getLength());
+ assertEquals(dob.getLength(), check.length);
+ assertArrayEquals(Arrays.copyOf(dob.getData(), dob.getLength()), check);
+
+ dob.reset();
+ dobb.reset();
+ writeJunk(dob, r, seed, 1000);
+ writeJunk(dobb, r, seed, 1000);
+ check = toBytes(dobb.getData(), dobb.getLength());
+ assertEquals(dob.getLength(), check.length);
+ assertArrayEquals(Arrays.copyOf(dob.getData(), dob.getLength()), check);
+ }
+
+ @Test
+ public void TestDataInputByteBufferCompatibility() throws IOException {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ writeJunk(dob, r, seed, 1000);
+ ByteBuffer buf = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+ DataInputByteBuffer dib = new DataInputByteBuffer();
+ dib.reset(buf);
+ readJunk(dib, r, seed, 1000);
+ }
+
+ @Test
+ public void TestDataOutputByteBufferCompatibility() throws IOException {
+ DataOutputByteBuffer dob = new DataOutputByteBuffer();
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ writeJunk(dob, r, seed, 1000);
+ ByteBuffer buf = ByteBuffer.allocate(dob.getLength());
+ for (ByteBuffer b : dob.getData()) {
+ buf.put(b);
+ }
+ buf.flip();
+ DataInputBuffer dib = new DataInputBuffer();
+ dib.reset(buf.array(), 0, buf.remaining());
+ readJunk(dib, r, seed, 1000);
+ }
+
+}