You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/04/24 00:10:13 UTC
svn commit: r768073 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java
src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java
Author: cdouglas
Date: Thu Apr 23 22:10:13 2009
New Revision: 768073
URL: http://svn.apache.org/viewvc?rev=768073&view=rev
Log:
HADOOP-5589. Eliminate source limit of 64 for map-side joins imposed by
TupleWritable encoding. Contributed by Jingkei Ly
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=768073&r1=768072&r2=768073&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Apr 23 22:10:13 2009
@@ -253,6 +253,9 @@
HADOOP-5705. Improve TotalOrderPartitioner efficiency by updating the trie
construction. (Dick King via cdouglas)
+ HADOOP-5589. Eliminate source limit of 64 for map-side joins imposed by
+ TupleWritable encoding. (Jingkei Ly via cdouglas)
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java?rev=768073&r1=768072&r2=768073&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java Thu Apr 23 22:10:13 2009
@@ -21,6 +21,7 @@
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
+import java.util.BitSet;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -42,20 +43,22 @@
*/
public class TupleWritable implements Writable, Iterable<Writable> {
- private long written;
+ private BitSet written;
private Writable[] values;
/**
* Create an empty tuple with no allocated storage for writables.
*/
- public TupleWritable() { }
+ public TupleWritable() {
+ written = new BitSet(0);
+ }
/**
* Initialize tuple with storage; unknown whether any of them contain
* "written" values.
*/
public TupleWritable(Writable[] vals) {
- written = 0L;
+ written = new BitSet(vals.length);
values = vals;
}
@@ -63,7 +66,7 @@
* Return true if tuple has an element at the position provided.
*/
public boolean has(int i) {
- return 0 != ((1L << i) & written);
+ return written.get(i);
}
/**
@@ -86,7 +89,7 @@
public boolean equals(Object other) {
if (other instanceof TupleWritable) {
TupleWritable that = (TupleWritable)other;
- if (this.size() != that.size() || this.written != that.written) {
+ if (!this.written.equals(that.written)) {
return false;
}
for (int i = 0; i < values.length; ++i) {
@@ -102,7 +105,7 @@
public int hashCode() {
assert false : "hashCode not designed";
- return (int)written;
+ return written.hashCode();
}
/**
@@ -113,24 +116,22 @@
public Iterator<Writable> iterator() {
final TupleWritable t = this;
return new Iterator<Writable>() {
- long i = written;
- long last = 0L;
+ int bitIndex = written.nextSetBit(0);
public boolean hasNext() {
- return 0L != i;
+ return bitIndex >= 0;
}
public Writable next() {
- last = Long.lowestOneBit(i);
- if (0 == last)
+ int returnIndex = bitIndex;
+ if (returnIndex < 0)
throw new NoSuchElementException();
- i ^= last;
- // numberOfTrailingZeros rtn 64 if lsb set
- return t.get(Long.numberOfTrailingZeros(last) % 64);
+ bitIndex = written.nextSetBit(bitIndex+1);
+ return t.get(returnIndex);
}
public void remove() {
- t.written ^= last;
- if (t.has(Long.numberOfTrailingZeros(last))) {
+ if (!written.get(bitIndex)) {
throw new IllegalStateException("Attempt to remove non-existent val");
}
+ written.clear(bitIndex);
}
};
}
@@ -162,7 +163,7 @@
*/
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, values.length);
- WritableUtils.writeVLong(out, written);
+ writeBitSet(out, values.length, written);
for (int i = 0; i < values.length; ++i) {
Text.writeString(out, values[i].getClass().getName());
}
@@ -180,7 +181,7 @@
public void readFields(DataInput in) throws IOException {
int card = WritableUtils.readVInt(in);
values = new Writable[card];
- written = WritableUtils.readVLong(in);
+ readBitSet(in, card, written);
Class<? extends Writable>[] cls = new Class[card];
try {
for (int i = 0; i < card; ++i) {
@@ -205,7 +206,7 @@
* Record that the tuple contains an element at the position provided.
*/
void setWritten(int i) {
- written |= 1L << i;
+ written.set(i);
}
/**
@@ -213,7 +214,7 @@
* provided.
*/
void clearWritten(int i) {
- written &= -1 ^ (1L << i);
+ written.clear(i);
}
/**
@@ -221,7 +222,67 @@
* releasing storage.
*/
void clearWritten() {
- written = 0L;
+ written.clear();
+ }
+
+ /**
+ * Writes the bit set to the stream. The first 64 bit-positions of the bit set
+ * are written as a VLong for backwards-compatibility with older versions of
+ * TupleWritable. All bit-positions >= 64 are encoded as a byte for every 8
+ * bit-positions.
+ */
+ private static final void writeBitSet(DataOutput stream, int nbits, BitSet bitSet)
+ throws IOException {
+ long bits = 0L;
+
+ int bitSetIndex = bitSet.nextSetBit(0);
+ for (;bitSetIndex >= 0 && bitSetIndex < Long.SIZE;
+ bitSetIndex=bitSet.nextSetBit(bitSetIndex+1)) {
+ bits |= 1L << bitSetIndex;
+ }
+ WritableUtils.writeVLong(stream,bits);
+
+ if (nbits > Long.SIZE) {
+ bits = 0L;
+ for (int lastWordWritten = 0; bitSetIndex >= 0 && bitSetIndex < nbits;
+ bitSetIndex = bitSet.nextSetBit(bitSetIndex+1)) {
+ int bitsIndex = bitSetIndex % Byte.SIZE;
+ int word = (bitSetIndex-Long.SIZE) / Byte.SIZE;
+ if (word > lastWordWritten) {
+ stream.writeByte((byte)bits);
+ bits = 0L;
+ for (lastWordWritten++;lastWordWritten<word;lastWordWritten++) {
+ stream.writeByte((byte)bits);
+ }
+ }
+ bits |= 1L << bitsIndex;
+ }
+ stream.writeByte((byte)bits);
+ }
}
+ /**
+ * Reads a bitset from the stream that has been written with
+ * {@link #writeBitSet(DataOutput, int, BitSet)}.
+ */
+ private static final void readBitSet(DataInput stream, int nbits,
+ BitSet bitSet) throws IOException {
+ bitSet.clear();
+ long initialBits = WritableUtils.readVLong(stream);
+ long last = 0L;
+ while (0L != initialBits) {
+ last = Long.lowestOneBit(initialBits);
+ initialBits ^= last;
+ bitSet.set(Long.numberOfTrailingZeros(last));
+ }
+
+ for (int offset=Long.SIZE; offset < nbits; offset+=Byte.SIZE) {
+ byte bits = stream.readByte();
+ while (0 != bits) {
+ last = Long.lowestOneBit(bits);
+ bits ^= last;
+ bitSet.set(Long.numberOfTrailingZeros(last) + offset);
+ }
+ }
+ }
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java?rev=768073&r1=768072&r2=768073&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java Thu Apr 23 22:10:13 2009
@@ -20,8 +20,9 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.DataOutputStream;
-
+import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
@@ -34,6 +35,7 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
public class TestTupleWritable extends TestCase {
@@ -58,7 +60,35 @@
}
return ret;
}
-
+
+ private Writable[] makeRandomWritables() {
+ Random r = new Random();
+ Writable[] writs = {
+ new BooleanWritable(r.nextBoolean()),
+ new FloatWritable(r.nextFloat()),
+ new FloatWritable(r.nextFloat()),
+ new IntWritable(r.nextInt()),
+ new LongWritable(r.nextLong()),
+ new BytesWritable("dingo".getBytes()),
+ new LongWritable(r.nextLong()),
+ new IntWritable(r.nextInt()),
+ new BytesWritable("yak".getBytes()),
+ new IntWritable(r.nextInt())
+ };
+ return writs;
+ }
+
+ private Writable[] makeRandomWritables(int numWrits)
+ {
+ Writable[] writs = makeRandomWritables();
+ Writable[] manyWrits = new Writable[numWrits];
+ for (int i =0; i<manyWrits.length; i++)
+ {
+ manyWrits[i] = writs[i%writs.length];
+ }
+ return manyWrits;
+ }
+
private int verifIter(Writable[] writs, TupleWritable t, int i) {
for (Writable w : t) {
if (w instanceof TupleWritable) {
@@ -132,6 +162,65 @@
assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
}
+ public void testWideWritable() throws Exception {
+ Writable[] manyWrits = makeRandomWritables(131);
+
+ TupleWritable sTuple = new TupleWritable(manyWrits);
+ for (int i =0; i<manyWrits.length; i++)
+ {
+ if (i % 3 == 0) {
+ sTuple.setWritten(i);
+ }
+ }
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ sTuple.write(new DataOutputStream(out));
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ TupleWritable dTuple = new TupleWritable();
+ dTuple.readFields(new DataInputStream(in));
+ assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
+ assertEquals("All tuple data has not been read from the stream",-1,in.read());
+ }
+
+ public void testWideWritable2() throws Exception {
+ Writable[] manyWrits = makeRandomWritables(71);
+
+ TupleWritable sTuple = new TupleWritable(manyWrits);
+ for (int i =0; i<manyWrits.length; i++)
+ {
+ sTuple.setWritten(i);
+ }
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ sTuple.write(new DataOutputStream(out));
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ TupleWritable dTuple = new TupleWritable();
+ dTuple.readFields(new DataInputStream(in));
+ assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
+ assertEquals("All tuple data has not been read from the stream",-1,in.read());
+ }
+
+ /**
+ * Tests a tuple writable with more than 64 values and the values set written
+ * spread far apart.
+ */
+ public void testSparseWideWritable() throws Exception {
+ Writable[] manyWrits = makeRandomWritables(131);
+
+ TupleWritable sTuple = new TupleWritable(manyWrits);
+ for (int i =0; i<manyWrits.length; i++)
+ {
+ if (i % 65 == 0) {
+ sTuple.setWritten(i);
+ }
+ }
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ sTuple.write(new DataOutputStream(out));
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ TupleWritable dTuple = new TupleWritable();
+ dTuple.readFields(new DataInputStream(in));
+ assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
+ assertEquals("All tuple data has not been read from the stream",-1,in.read());
+ }
+
public void testWideTuple() throws Exception {
Text emptyText = new Text("Should be empty");
Writable[] values = new Writable[64];
@@ -171,4 +260,116 @@
}
}
}
+
+ /**
+ * Tests that we can write more than 64 values.
+ */
+ public void testWideTupleBoundary() throws Exception {
+ Text emptyText = new Text("Should not be set written");
+ Writable[] values = new Writable[65];
+ Arrays.fill(values,emptyText);
+ values[64] = new Text("Should be the only value set written");
+
+ TupleWritable tuple = new TupleWritable(values);
+ tuple.setWritten(64);
+
+ for (int pos=0; pos<tuple.size();pos++) {
+ boolean has = tuple.has(pos);
+ if (pos == 64) {
+ assertTrue(has);
+ }
+ else {
+ assertFalse("Tuple position is incorrectly labelled as set: " + pos, has);
+ }
+ }
+ }
+
+ /**
+ * Tests compatibility with pre-0.21 versions of TupleWritable
+ */
+ public void testPreVersion21Compatibility() throws Exception {
+ Writable[] manyWrits = makeRandomWritables(64);
+ PreVersion21TupleWritable oldTuple = new PreVersion21TupleWritable(manyWrits);
+
+ for (int i =0; i<manyWrits.length; i++) {
+ if (i % 3 == 0) {
+ oldTuple.setWritten(i);
+ }
+ }
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ oldTuple.write(new DataOutputStream(out));
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ TupleWritable dTuple = new TupleWritable();
+ dTuple.readFields(new DataInputStream(in));
+ assertTrue("Tuple writable is unable to read pre-0.21 versions of TupleWritable", oldTuple.isCompatible(dTuple));
+ assertEquals("All tuple data has not been read from the stream",-1,in.read());
+ }
+
+ public void testPreVersion21CompatibilityEmptyTuple() throws Exception {
+ Writable[] manyWrits = new Writable[0];
+ PreVersion21TupleWritable oldTuple = new PreVersion21TupleWritable(manyWrits);
+ // don't set any values written
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ oldTuple.write(new DataOutputStream(out));
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ TupleWritable dTuple = new TupleWritable();
+ dTuple.readFields(new DataInputStream(in));
+ assertTrue("Tuple writable is unable to read pre-0.21 versions of TupleWritable", oldTuple.isCompatible(dTuple));
+ assertEquals("All tuple data has not been read from the stream",-1,in.read());
+ }
+
+ /**
+ * Writes to the DataOutput stream in the same way as pre-0.21 versions of
+ * {@link TupleWritable#write(DataOutput)}
+ */
+ private static class PreVersion21TupleWritable {
+
+ private Writable[] values;
+ private long written = 0L;
+
+ private PreVersion21TupleWritable(Writable[] vals) {
+ written = 0L;
+ values = vals;
+ }
+
+ private void setWritten(int i) {
+ written |= 1L << i;
+ }
+
+ private boolean has(int i) {
+ return 0 != ((1L << i) & written);
+ }
+
+ private void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, values.length);
+ WritableUtils.writeVLong(out, written);
+ for (int i = 0; i < values.length; ++i) {
+ Text.writeString(out, values[i].getClass().getName());
+ }
+ for (int i = 0; i < values.length; ++i) {
+ if (has(i)) {
+ values[i].write(out);
+ }
+ }
+ }
+
+ public int size() {
+ return values.length;
+ }
+
+ public boolean isCompatible(TupleWritable that) {
+ if (this.size() != that.size()) {
+ return false;
+ }
+ for (int i = 0; i < values.length; ++i) {
+ if (has(i)!=that.has(i)) {
+ return false;
+ }
+ if (has(i) && !values[i].equals(that.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
}