You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/04/24 00:10:13 UTC

svn commit: r768073 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java

Author: cdouglas
Date: Thu Apr 23 22:10:13 2009
New Revision: 768073

URL: http://svn.apache.org/viewvc?rev=768073&view=rev
Log:
HADOOP-5589. Eliminate source limit of 64 for map-side joins imposed by
TupleWritable encoding. Contributed by Jingkei Ly

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=768073&r1=768072&r2=768073&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Apr 23 22:10:13 2009
@@ -253,6 +253,9 @@
     HADOOP-5705. Improve TotalOrderPartitioner efficiency by updating the trie
     construction. (Dick King via cdouglas)
 
+    HADOOP-5589. Eliminate source limit of 64 for map-side joins imposed by
+    TupleWritable encoding. (Jingkei Ly via cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java?rev=768073&r1=768072&r2=768073&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/TupleWritable.java Thu Apr 23 22:10:13 2009
@@ -21,6 +21,7 @@
 import java.io.DataOutput;
 import java.io.DataInput;
 import java.io.IOException;
+import java.util.BitSet;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
@@ -42,20 +43,22 @@
  */
 public class TupleWritable implements Writable, Iterable<Writable> {
 
-  private long written;
+  private BitSet written;
   private Writable[] values;
 
   /**
    * Create an empty tuple with no allocated storage for writables.
    */
-  public TupleWritable() { }
+  public TupleWritable() {
+    written = new BitSet(0);
+  }
 
   /**
    * Initialize tuple with storage; unknown whether any of them contain
    * &quot;written&quot; values.
    */
   public TupleWritable(Writable[] vals) {
-    written = 0L;
+    written = new BitSet(vals.length);
     values = vals;
   }
 
@@ -63,7 +66,7 @@
    * Return true if tuple has an element at the position provided.
    */
   public boolean has(int i) {
-    return 0 != ((1L << i) & written);
+    return written.get(i);
   }
 
   /**
@@ -86,7 +89,7 @@
   public boolean equals(Object other) {
     if (other instanceof TupleWritable) {
       TupleWritable that = (TupleWritable)other;
-      if (this.size() != that.size() || this.written != that.written) {
+      if (!this.written.equals(that.written)) {
         return false;
       }
       for (int i = 0; i < values.length; ++i) {
@@ -102,7 +105,7 @@
 
   public int hashCode() {
     assert false : "hashCode not designed";
-    return (int)written;
+    return written.hashCode();
   }
 
   /**
@@ -113,24 +116,22 @@
   public Iterator<Writable> iterator() {
     final TupleWritable t = this;
     return new Iterator<Writable>() {
-      long i = written;
-      long last = 0L;
+      int bitIndex = written.nextSetBit(0);
       public boolean hasNext() {
-        return 0L != i;
+        return bitIndex >= 0;
       }
       public Writable next() {
-        last = Long.lowestOneBit(i);
-        if (0 == last)
+        int returnIndex = bitIndex;
+        if (returnIndex < 0)
           throw new NoSuchElementException();
-        i ^= last;
-        // numberOfTrailingZeros rtn 64 if lsb set
-        return t.get(Long.numberOfTrailingZeros(last) % 64);
+        bitIndex = written.nextSetBit(bitIndex+1);
+        return t.get(returnIndex);
       }
       public void remove() {
-        t.written ^= last;
-        if (t.has(Long.numberOfTrailingZeros(last))) {
+        if (!written.get(bitIndex)) {
           throw new IllegalStateException("Attempt to remove non-existent val");
         }
+        written.clear(bitIndex);
       }
     };
   }
@@ -162,7 +163,7 @@
    */
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeVInt(out, values.length);
-    WritableUtils.writeVLong(out, written);
+    writeBitSet(out, values.length, written);
     for (int i = 0; i < values.length; ++i) {
       Text.writeString(out, values[i].getClass().getName());
     }
@@ -180,7 +181,7 @@
   public void readFields(DataInput in) throws IOException {
     int card = WritableUtils.readVInt(in);
     values = new Writable[card];
-    written = WritableUtils.readVLong(in);
+    readBitSet(in, card, written);
     Class<? extends Writable>[] cls = new Class[card];
     try {
       for (int i = 0; i < card; ++i) {
@@ -205,7 +206,7 @@
    * Record that the tuple contains an element at the position provided.
    */
   void setWritten(int i) {
-    written |= 1L << i;
+    written.set(i);
   }
 
   /**
@@ -213,7 +214,7 @@
    * provided.
    */
   void clearWritten(int i) {
-    written &= -1 ^ (1L << i);
+    written.clear(i);
   }
 
   /**
@@ -221,7 +222,67 @@
    * releasing storage.
    */
   void clearWritten() {
-    written = 0L;
+    written.clear();
+  }
+
+  /**
+   * Writes the bit set to the stream. The first 64 bit-positions of the bit set
+   * are written as a VLong for backwards-compatibility with older versions of
+   * TupleWritable. All bit-positions >= 64 are encoded as a byte for every 8
+   * bit-positions.
+   */
+  private static final void writeBitSet(DataOutput stream, int nbits, BitSet bitSet)
+      throws IOException {
+    long bits = 0L;
+        
+    int bitSetIndex = bitSet.nextSetBit(0);
+    for (;bitSetIndex >= 0 && bitSetIndex < Long.SIZE;
+            bitSetIndex=bitSet.nextSetBit(bitSetIndex+1)) {
+      bits |= 1L << bitSetIndex;
+    }
+    WritableUtils.writeVLong(stream,bits);
+    
+    if (nbits > Long.SIZE) {
+      bits = 0L;
+      for (int lastWordWritten = 0; bitSetIndex >= 0 && bitSetIndex < nbits; 
+              bitSetIndex = bitSet.nextSetBit(bitSetIndex+1)) {
+        int bitsIndex = bitSetIndex % Byte.SIZE;
+        int word = (bitSetIndex-Long.SIZE) / Byte.SIZE;
+        if (word > lastWordWritten) {
+          stream.writeByte((byte)bits);
+          bits = 0L;
+          for (lastWordWritten++;lastWordWritten<word;lastWordWritten++) {
+            stream.writeByte((byte)bits);
+          }
+        }
+        bits |= 1L << bitsIndex;
+      }
+      stream.writeByte((byte)bits);
+    }
   }
 
+  /**
+   * Reads a bitset from the stream that has been written with
+   * {@link #writeBitSet(DataOutput, int, BitSet)}.
+   */
+  private static final void readBitSet(DataInput stream, int nbits, 
+      BitSet bitSet) throws IOException {
+    bitSet.clear();
+    long initialBits = WritableUtils.readVLong(stream);
+    long last = 0L;
+    while (0L != initialBits) {
+      last = Long.lowestOneBit(initialBits);
+      initialBits ^= last;
+      bitSet.set(Long.numberOfTrailingZeros(last));
+    }
+    
+    for (int offset=Long.SIZE; offset < nbits; offset+=Byte.SIZE) {
+      byte bits = stream.readByte();
+      while (0 != bits) {
+        last = Long.lowestOneBit(bits);
+        bits ^= last;
+        bitSet.set(Long.numberOfTrailingZeros(last) + offset);
+      }
+    }
+  }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java?rev=768073&r1=768072&r2=768073&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java Thu Apr 23 22:10:13 2009
@@ -20,8 +20,9 @@
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
-
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
 
@@ -34,6 +35,7 @@
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 
 public class TestTupleWritable extends TestCase {
 
@@ -58,7 +60,35 @@
     }
     return ret;
   }
-
+  
+  private Writable[] makeRandomWritables() {
+    Random r = new Random();
+    Writable[] writs = {
+      new BooleanWritable(r.nextBoolean()),
+      new FloatWritable(r.nextFloat()),
+      new FloatWritable(r.nextFloat()),
+      new IntWritable(r.nextInt()),
+      new LongWritable(r.nextLong()),
+      new BytesWritable("dingo".getBytes()),
+      new LongWritable(r.nextLong()),
+      new IntWritable(r.nextInt()),
+      new BytesWritable("yak".getBytes()),
+      new IntWritable(r.nextInt())
+    };
+    return writs;
+  }
+  
+  private Writable[] makeRandomWritables(int numWrits)
+  {
+    Writable[] writs = makeRandomWritables();
+    Writable[] manyWrits = new Writable[numWrits];
+    for (int i =0; i<manyWrits.length; i++)
+    {
+      manyWrits[i] = writs[i%writs.length];
+    }
+    return manyWrits;
+  }
+  
   private int verifIter(Writable[] writs, TupleWritable t, int i) {
     for (Writable w : t) {
       if (w instanceof TupleWritable) {
@@ -132,6 +162,65 @@
     assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
   }
 
+  public void testWideWritable() throws Exception {
+    Writable[] manyWrits = makeRandomWritables(131);
+    
+    TupleWritable sTuple = new TupleWritable(manyWrits);
+    for (int i =0; i<manyWrits.length; i++)
+    {
+      if (i % 3 == 0) {
+        sTuple.setWritten(i);
+      }
+    }
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    sTuple.write(new DataOutputStream(out));
+    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+    TupleWritable dTuple = new TupleWritable();
+    dTuple.readFields(new DataInputStream(in));
+    assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
+    assertEquals("All tuple data has not been read from the stream",-1,in.read());
+  }
+  
+  public void testWideWritable2() throws Exception {
+    Writable[] manyWrits = makeRandomWritables(71);
+    
+    TupleWritable sTuple = new TupleWritable(manyWrits);
+    for (int i =0; i<manyWrits.length; i++)
+    {
+      sTuple.setWritten(i);
+    }
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    sTuple.write(new DataOutputStream(out));
+    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+    TupleWritable dTuple = new TupleWritable();
+    dTuple.readFields(new DataInputStream(in));
+    assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
+    assertEquals("All tuple data has not been read from the stream",-1,in.read());
+  }
+  
+  /**
+   * Tests a tuple writable with more than 64 values and the values set written
+   * spread far apart.
+   */
+  public void testSparseWideWritable() throws Exception {
+    Writable[] manyWrits = makeRandomWritables(131);
+    
+    TupleWritable sTuple = new TupleWritable(manyWrits);
+    for (int i =0; i<manyWrits.length; i++)
+    {
+      if (i % 65 == 0) {
+        sTuple.setWritten(i);
+      }
+    }
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    sTuple.write(new DataOutputStream(out));
+    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+    TupleWritable dTuple = new TupleWritable();
+    dTuple.readFields(new DataInputStream(in));
+    assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
+    assertEquals("All tuple data has not been read from the stream",-1,in.read());
+  }
+  
   public void testWideTuple() throws Exception {
     Text emptyText = new Text("Should be empty");
     Writable[] values = new Writable[64];
@@ -171,4 +260,116 @@
       }
     }
   }
+  
+  /**
+   * Tests that we can write more than 64 values.
+   */
+  public void testWideTupleBoundary() throws Exception {
+    Text emptyText = new Text("Should not be set written");
+    Writable[] values = new Writable[65];
+    Arrays.fill(values,emptyText);
+    values[64] = new Text("Should be the only value set written");
+                                     
+    TupleWritable tuple = new TupleWritable(values);
+    tuple.setWritten(64);
+    
+    for (int pos=0; pos<tuple.size();pos++) {
+      boolean has = tuple.has(pos);
+      if (pos == 64) {
+        assertTrue(has);
+      }
+      else {
+        assertFalse("Tuple position is incorrectly labelled as set: " + pos, has);
+      }
+    }
+  }
+  
+  /**
+   * Tests compatibility with pre-0.21 versions of TupleWritable
+   */
+  public void testPreVersion21Compatibility() throws Exception {
+    Writable[] manyWrits = makeRandomWritables(64);
+    PreVersion21TupleWritable oldTuple = new PreVersion21TupleWritable(manyWrits);
+    
+    for (int i =0; i<manyWrits.length; i++) {
+      if (i % 3 == 0) {
+        oldTuple.setWritten(i);
+      }
+    }
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    oldTuple.write(new DataOutputStream(out));
+    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+    TupleWritable dTuple = new TupleWritable();
+    dTuple.readFields(new DataInputStream(in));
+    assertTrue("Tuple writable is unable to read pre-0.21 versions of TupleWritable", oldTuple.isCompatible(dTuple));
+    assertEquals("All tuple data has not been read from the stream",-1,in.read());
+  }
+  
+  public void testPreVersion21CompatibilityEmptyTuple() throws Exception {
+    Writable[] manyWrits = new Writable[0];
+    PreVersion21TupleWritable oldTuple = new PreVersion21TupleWritable(manyWrits);
+    // don't set any values written
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    oldTuple.write(new DataOutputStream(out));
+    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+    TupleWritable dTuple = new TupleWritable();
+    dTuple.readFields(new DataInputStream(in));
+    assertTrue("Tuple writable is unable to read pre-0.21 versions of TupleWritable", oldTuple.isCompatible(dTuple));
+    assertEquals("All tuple data has not been read from the stream",-1,in.read());
+  }
+  
+  /**
+   * Writes to the DataOutput stream in the same way as pre-0.21 versions of
+   * {@link TupleWritable#write(DataOutput)}
+   */
+  private static class PreVersion21TupleWritable {
+    
+    private Writable[] values;
+    private long written = 0L;
+
+    private PreVersion21TupleWritable(Writable[] vals) {
+      written = 0L;
+      values = vals;
+    }
+        
+    private void setWritten(int i) {
+      written |= 1L << i;
+    }
+
+    private boolean has(int i) {
+      return 0 != ((1L << i) & written);
+    }
+    
+    private void write(DataOutput out) throws IOException {
+      WritableUtils.writeVInt(out, values.length);
+      WritableUtils.writeVLong(out, written);
+      for (int i = 0; i < values.length; ++i) {
+        Text.writeString(out, values[i].getClass().getName());
+      }
+      for (int i = 0; i < values.length; ++i) {
+        if (has(i)) {
+          values[i].write(out);
+        }
+      }
+    }
+    
+    public int size() {
+      return values.length;
+    }
+    
+    public boolean isCompatible(TupleWritable that) {
+      if (this.size() != that.size()) {
+        return false;
+      }      
+      for (int i = 0; i < values.length; ++i) {
+        if (has(i)!=that.has(i)) {
+          return false;
+        }
+        if (has(i) && !values[i].equals(that.get(i))) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
 }