You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/10/13 00:07:17 UTC

svn commit: r824516 - in /hadoop/common/trunk: ./ src/java/org/apache/hadoop/io/file/tfile/ src/test/core/org/apache/hadoop/io/file/tfile/

Author: ddas
Date: Mon Oct 12 22:07:17 2009
New Revision: 824516

URL: http://svn.apache.org/viewvc?rev=824516&view=rev
Log:
HADOOP-6218. Adds a feature where TFile can be split by Record Sequeunce number. Contributed by Hong Tang and Raghu Angadi.

Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/java/org/apache/hadoop/io/file/tfile/TFile.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileSplit.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=824516&r1=824515&r2=824516&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Mon Oct 12 22:07:17 2009
@@ -10,6 +10,9 @@
     hadoop-config.sh so that it allows setting java command options for
     JAVA_PLATFORM.  (Koji Noguchi via szetszwo)
 
+    HADOOP-6218. Adds a feature where TFile can be split by Record
+    Sequence number. (Hong Tang and Raghu Angadi via ddas)
+
   IMPROVEMENTS
 
     HADOOP-6283. Improve the exception messages thrown by

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/file/tfile/TFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/file/tfile/TFile.java?rev=824516&r1=824515&r2=824516&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/file/tfile/TFile.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/file/tfile/TFile.java Mon Oct 12 22:07:17 2009
@@ -669,10 +669,10 @@
    * TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner.
    * objects. A scanner may scan the whole TFile ({@link Reader#createScanner()}
    * ) , a portion of TFile based on byte offsets (
-   * {@link Reader#createScanner(long, long)}), or a portion of TFile with keys
+   * {@link Reader#createScannerByByteRange(long, long)}), or a portion of TFile with keys
    * fall in a certain key range (for sorted TFile only,
-   * {@link Reader#createScanner(byte[], byte[])} or
-   * {@link Reader#createScanner(RawComparable, RawComparable)}).
+   * {@link Reader#createScannerByKey(byte[], byte[])} or
+   * {@link Reader#createScannerByKey(RawComparable, RawComparable)}).
    */
   public static class Reader implements Closeable {
     // The underlying BCFile reader.
@@ -986,6 +986,16 @@
       return new Location(blkIndex, 0);
     }
 
+    Location getLocationByRecordNum(long recNum) throws IOException {
+      checkTFileDataIndex();
+      return tfileIndex.getLocationByRecordNum(recNum);
+    }
+
+    long getRecordNumByLocation(Location location) throws IOException {
+      checkTFileDataIndex();
+      return tfileIndex.getRecordNumByLocation(location);      
+    }
+    
     int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) {
       if (!isSorted()) {
         throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
@@ -1017,6 +1027,21 @@
     }
 
     /**
+     * Get the RecordNum for the first key-value pair in a compressed block
+     * whose byte offset in the TFile is greater than or equal to the specified
+     * offset.
+     * 
+     * @param offset
+     *          the user supplied offset.
+     * @return the RecordNum to the corresponding entry. If no such entry
+     *         exists, it returns the total entry count.
+     * @throws IOException
+     */
+    public long getRecordNumNear(long offset) throws IOException {
+      return getRecordNumByLocation(getLocationNear(offset));
+    }
+    
+    /**
      * Get a sample key that is within a block whose starting offset is greater
      * than or equal to the specified offset.
      * 
@@ -1058,7 +1083,7 @@
      *         contains zero key-value pairs even if length is positive.
      * @throws IOException
      */
-    public Scanner createScanner(long offset, long length) throws IOException {
+    public Scanner createScannerByByteRange(long offset, long length) throws IOException {
       return new Scanner(this, offset, offset + length);
     }
 
@@ -1074,10 +1099,31 @@
      * @return The actual coverage of the returned scanner will cover all keys
      *         greater than or equal to the beginKey and less than the endKey.
      * @throws IOException
+     * 
+     * @deprecated Use {@link #createScannerByKey(byte[], byte[])} instead.
      */
+    @Deprecated
     public Scanner createScanner(byte[] beginKey, byte[] endKey)
+      throws IOException {
+      return createScannerByKey(beginKey, endKey);
+    }
+    
+    /**
+     * Get a scanner that covers a portion of TFile based on keys.
+     * 
+     * @param beginKey
+     *          Begin key of the scan (inclusive). If null, scan from the first
+     *          key-value entry of the TFile.
+     * @param endKey
+     *          End key of the scan (exclusive). If null, scan up to the last
+     *          key-value entry of the TFile.
+     * @return The actual coverage of the returned scanner will cover all keys
+     *         greater than or equal to the beginKey and less than the endKey.
+     * @throws IOException
+     */
+    public Scanner createScannerByKey(byte[] beginKey, byte[] endKey)
         throws IOException {
-      return createScanner((beginKey == null) ? null : new ByteArray(beginKey,
+      return createScannerByKey((beginKey == null) ? null : new ByteArray(beginKey,
           0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey,
           0, endKey.length));
     }
@@ -1094,9 +1140,31 @@
      * @return The actual coverage of the returned scanner will cover all keys
      *         greater than or equal to the beginKey and less than the endKey.
      * @throws IOException
+     * 
+     * @deprecated Use {@link #createScannerByKey(RawComparable, RawComparable)}
+     *             instead.
      */
+    @Deprecated
     public Scanner createScanner(RawComparable beginKey, RawComparable endKey)
         throws IOException {
+      return createScannerByKey(beginKey, endKey);
+    }
+
+    /**
+     * Get a scanner that covers a specific key range.
+     * 
+     * @param beginKey
+     *          Begin key of the scan (inclusive). If null, scan from the first
+     *          key-value entry of the TFile.
+     * @param endKey
+     *          End key of the scan (exclusive). If null, scan up to the last
+     *          key-value entry of the TFile.
+     * @return The actual coverage of the returned scanner will cover all keys
+     *         greater than or equal to the beginKey and less than the endKey.
+     * @throws IOException
+     */
+    public Scanner createScannerByKey(RawComparable beginKey, RawComparable endKey)
+        throws IOException {
       if ((beginKey != null) && (endKey != null)
           && (compareKeys(beginKey, endKey) >= 0)) {
         return new Scanner(this, beginKey, beginKey);
@@ -1105,6 +1173,27 @@
     }
 
     /**
+     * Create a scanner that covers a range of records.
+     * 
+     * @param beginRecNum
+     *          The RecordNum for the first record (inclusive).
+     * @param endRecNum
+     *          The RecordNum for the last record (exclusive). To scan the whole
+     *          file, either specify endRecNum==-1 or endRecNum==getEntryCount().
+     * @return The TFile scanner that covers the specified range of records.
+     * @throws IOException
+     */
+    public Scanner createScannerByRecordNum(long beginRecNum, long endRecNum)
+        throws IOException {
+      if (beginRecNum < 0) beginRecNum = 0;
+      if (endRecNum < 0 || endRecNum > getEntryCount()) {
+        endRecNum = getEntryCount();
+      }
+      return new Scanner(this, getLocationByRecordNum(beginRecNum),
+          getLocationByRecordNum(endRecNum));
+    }
+
+    /**
      * The TFile Scanner. The Scanner has an implicit cursor, which, upon
      * creation, points to the first key-value pair in the scan range. If the
      * scan range is empty, the cursor will point to the end of the scan range.
@@ -1524,6 +1613,15 @@
       }
 
       /**
+       * Get the RecordNum corresponding to the entry pointed by the cursor.
+       * @return The RecordNum corresponding to the entry pointed by the cursor.
+       * @throws IOException
+       */
+      public long getRecordNum() throws IOException {
+        return reader.getRecordNumByLocation(currentLocation);
+      }
+      
+      /**
        * Internal API. Comparing the key at cursor to user-specified key.
        * 
        * @param other
@@ -2021,8 +2119,10 @@
     final static String BLOCK_NAME = "TFile.index";
     private ByteArray firstKey;
     private final ArrayList<TFileIndexEntry> index;
+    private final ArrayList<Long> recordNumIndex;
     private final BytesComparator comparator;
-
+    private long sum = 0;
+    
     /**
      * For reading from file.
      * 
@@ -2031,6 +2131,7 @@
     public TFileIndex(int entryCount, DataInput in, BytesComparator comparator)
         throws IOException {
       index = new ArrayList<TFileIndexEntry>(entryCount);
+      recordNumIndex = new ArrayList<Long>(entryCount);
       int size = Utils.readVInt(in); // size for the first key entry.
       if (size > 0) {
         byte[] buffer = new byte[size];
@@ -2052,6 +2153,8 @@
               new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream(
                   buffer, 0, size)));
           index.add(idx);
+          sum += idx.entries();
+          recordNumIndex.add(sum);
         }
       } else {
         if (entryCount != 0) {
@@ -2083,6 +2186,12 @@
       return ret;
     }
 
+    /**
+     * @param key
+     *          input key.
+     * @return the ID of the first block that contains key > input key. Or -1
+     *         if no such block exists.
+     */
     public int upperBound(RawComparable key) {
       if (comparator == null) {
         throw new RuntimeException("Cannot search in unsorted TFile");
@@ -2104,13 +2213,26 @@
      */
     public TFileIndex(BytesComparator comparator) {
       index = new ArrayList<TFileIndexEntry>();
+      recordNumIndex = new ArrayList<Long>();
       this.comparator = comparator;
     }
 
     public RawComparable getFirstKey() {
       return firstKey;
     }
+    
+    public Reader.Location getLocationByRecordNum(long recNum) {
+      int idx = Utils.upperBound(recordNumIndex, recNum);
+      long lastRecNum = (idx == 0)? 0: recordNumIndex.get(idx-1);
+      return new Reader.Location(idx, recNum-lastRecNum);
+    }
 
+    public long getRecordNumByLocation(Reader.Location location) {
+      int blkIndex = location.getBlockIndex();
+      long lastRecNum = (blkIndex == 0) ? 0: recordNumIndex.get(blkIndex-1);
+      return lastRecNum + location.getRecordIndex();
+    }
+    
     public void setFirstKey(byte[] key, int offset, int length) {
       firstKey = new ByteArray(new byte[length]);
       System.arraycopy(key, offset, firstKey.buffer(), 0, length);
@@ -2125,6 +2247,8 @@
 
     public void addEntry(TFileIndexEntry keyEntry) {
       index.add(keyEntry);
+      sum += keyEntry.entries();
+      recordNumIndex.add(sum);
     }
 
     public TFileIndexEntry getEntry(int bid) {

Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java?rev=824516&r1=824515&r2=824516&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java Mon Oct 12 22:07:17 2009
@@ -319,7 +319,7 @@
 
     scanner.close();
     // test for a range of scanner
-    scanner = reader.createScanner(getSomeKey(10), getSomeKey(60));
+    scanner = reader.createScannerByKey(getSomeKey(10), getSomeKey(60));
     readAndCheckbytes(scanner, 10, 50);
     assertFalse(scanner.advance());
     scanner.close();

Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java?rev=824516&r1=824515&r2=824516&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java Mon Oct 12 22:07:17 2009
@@ -673,7 +673,7 @@
     Reader reader =
         new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
     Scanner scanner =
-        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
+        reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
             .getBytes(), null);
 
     try {
@@ -698,7 +698,7 @@
       throws IOException {
     Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
     Scanner scanner =
-        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
+        reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
             .getBytes(), null);
 
     try {
@@ -729,7 +729,7 @@
     Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
 
     Scanner scanner =
-        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
+        reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
             .getBytes(), null);
 
     byte[] vbuf1 = new byte[BUF_SIZE];
@@ -753,7 +753,7 @@
     Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
 
     Scanner scanner =
-        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
+        reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
             .getBytes(), null);
 
     // read the indexed key

Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileSplit.java?rev=824516&r1=824515&r2=824516&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileSplit.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileSplit.java Mon Oct 12 22:07:17 2009
@@ -17,6 +17,7 @@
 package org.apache.hadoop.io.file.tfile;
 
 import java.io.IOException;
+import java.util.Random;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
@@ -42,6 +43,7 @@
   private FileSystem fs;
   private Configuration conf;
   private Path path;
+  private Random random = new Random();
 
   private String comparator = "memcmp";
   private String outputFile = "TestTFileSplit";
@@ -74,7 +76,7 @@
     long rowCount = 0;
     BytesWritable key, value;
     for (int i = 0; i < numSplit; ++i, offset += splitSize) {
-      Scanner scanner = reader.createScanner(offset, splitSize);
+      Scanner scanner = reader.createScannerByByteRange(offset, splitSize);
       int count = 0;
       key = new BytesWritable();
       value = new BytesWritable();
@@ -90,18 +92,101 @@
     Assert.assertEquals(rowCount, reader.getEntryCount());
     reader.close();
   }
+
+  /* Similar to readFile(), tests the scanner created 
+   * by record numbers rather than the offsets.
+   */
+  void readRowSplits(int numSplits) throws IOException {
+
+    Reader reader =
+      new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    
+    long totalRecords = reader.getEntryCount();
+    for (int i=0; i<numSplits; i++) {
+      long startRec = i*totalRecords/numSplits;
+      long endRec = (i+1)*totalRecords/numSplits;
+      if (i == numSplits-1) {
+        endRec = totalRecords;
+      }
+      Scanner scanner = reader.createScannerByRecordNum(startRec, endRec);
+      int count = 0;
+      BytesWritable key = new BytesWritable();
+      BytesWritable value = new BytesWritable();
+      long x=startRec;
+      while (!scanner.atEnd()) {
+        assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x);
+        scanner.entry().get(key, value);
+        ++count;
+        assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x);
+        scanner.advance();
+        ++x;
+      }
+      scanner.close();
+      Assert.assertTrue(count == (endRec - startRec));
+    }
+    // make sure specifying range at the end gives zero records.
+    Scanner scanner = reader.createScannerByRecordNum(totalRecords, -1);
+    Assert.assertTrue(scanner.atEnd());
+  }
   
   static String composeSortedKey(String prefix, int total, int value) {
     return String.format("%s%010d", prefix, value);
   }
   
+  void checkRecNums() throws IOException {
+    long fileLen = fs.getFileStatus(path).getLen();
+    Reader reader = new Reader(fs.open(path), fileLen, conf);
+    long totalRecs = reader.getEntryCount();
+    long begin = random.nextLong() % (totalRecs / 2);
+    if (begin < 0)
+      begin += (totalRecs / 2);
+    long end = random.nextLong() % (totalRecs / 2);
+    if (end < 0)
+      end += (totalRecs / 2);
+    end += (totalRecs / 2) + 1;
+
+    assertEquals("RecNum for offset=0 should be 0", 0, reader
+        .getRecordNumNear(0));
+    for (long x : new long[] { fileLen, fileLen + 1, 2 * fileLen }) {
+      assertEquals("RecNum for offset>=fileLen should be total entries",
+          totalRecs, reader.getRecordNumNear(x));
+    }
+
+    for (long i = 0; i < 100; ++i) {
+      assertEquals("Locaton to RecNum conversion not symmetric", i, reader
+          .getRecordNumByLocation(reader.getLocationByRecordNum(i)));
+    }
+
+    for (long i = 1; i < 100; ++i) {
+      long x = totalRecs - i;
+      assertEquals("Locaton to RecNum conversion not symmetric", x, reader
+          .getRecordNumByLocation(reader.getLocationByRecordNum(x)));
+    }
+
+    for (long i = begin; i < end; ++i) {
+      assertEquals("Locaton to RecNum conversion not symmetric", i, reader
+          .getRecordNumByLocation(reader.getLocationByRecordNum(i)));
+    }
+
+    for (int i = 0; i < 1000; ++i) {
+      long x = random.nextLong() % totalRecs;
+      if (x < 0) x += totalRecs;
+      assertEquals("Locaton to RecNum conversion not symmetric", x, reader
+          .getRecordNumByLocation(reader.getLocationByRecordNum(x)));
+    }
+  }
+  
   public void testSplit() throws IOException {
     System.out.println("testSplit");
     createFile(100000, Compression.Algorithm.NONE.getName());
+    checkRecNums();   
     readFile();
+    readRowSplits(10);
     fs.delete(path, true);
     createFile(500000, Compression.Algorithm.GZ.getName());
+    checkRecNums();
     readFile();
+    readRowSplits(83);
     fs.delete(path, true);
   }
 }

Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java?rev=824516&r1=824515&r2=824516&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java Mon Oct 12 22:07:17 2009
@@ -89,7 +89,7 @@
 
     try {
       Scanner scanner =
-          reader.createScanner("aaa".getBytes(), "zzz".getBytes());
+          reader.createScannerByKey("aaa".getBytes(), "zzz".getBytes());
       Assert
           .fail("Failed to catch creating scanner with keys on unsorted file.");
     }