You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC

svn commit: r901644 [34/37] - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/jav...

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PageManager.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PageManager.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PageManager.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PageManager.java Thu Jan 21 10:37:58 2010
@@ -65,226 +65,225 @@
 
 package org.apache.hadoop.hive.ql.util.jdbm.recman;
 
-import java.io.*;
+import java.io.IOException;
 
 /**
- *  This class manages the linked lists of pages that make up a file.
+ * This class manages the linked lists of pages that make up a file.
  */
 final class PageManager {
-    // our record file
-    private RecordFile file;
-    // header data
-    private FileHeader header;
-    private BlockIo headerBuf;
-    
-    /**
-     *  Creates a new page manager using the indicated record file.
-     */
-    PageManager(RecordFile file) throws IOException {
-        this.file = file;
-        
-        // check the file header. If the magic is 0, we assume a new
-        // file. Note that we hold on to the file header node.
-        headerBuf = file.get(0);
-        if (headerBuf.readShort(0) == 0)
-            header = new FileHeader(headerBuf, true);
-        else
-            header = new FileHeader(headerBuf, false);
-    }
-    
-    /**
-     *  Allocates a page of the indicated type. Returns recid of the
-     *  page.
-     */
-    long allocate(short type) throws IOException {
-        
-        if (type == Magic.FREE_PAGE)
-            throw new Error("allocate of free page?");
-        
-        // do we have something on the free list?
-        long retval = header.getFirstOf(Magic.FREE_PAGE);
-        boolean isNew = false;
-        if (retval != 0) {
-            // yes. Point to it and make the next of that page the
-            // new first free page.
-            header.setFirstOf(Magic.FREE_PAGE, getNext(retval));
-        }
-        else {
-            // nope. make a new record
-            retval = header.getLastOf(Magic.FREE_PAGE);
-            if (retval == 0)
-                // very new file - allocate record #1
-                retval = 1;
-            header.setLastOf(Magic.FREE_PAGE, retval + 1);
-            isNew = true;
-        }
-        
-        // Cool. We have a record, add it to the correct list
-        BlockIo buf = file.get(retval);
-        PageHeader pageHdr = isNew ? new PageHeader(buf, type) 
-            : PageHeader.getView(buf);
-        long oldLast = header.getLastOf(type);
-        
-        // Clean data.
-        System.arraycopy(RecordFile.cleanData, 0, 
-                         buf.getData(), 0, 
-                         RecordFile.BLOCK_SIZE);
-        pageHdr.setType(type);
-        pageHdr.setPrev(oldLast);
-        pageHdr.setNext(0);
-        
-        
-        if (oldLast == 0)
-            // This was the first one of this type
-            header.setFirstOf(type, retval);
-        header.setLastOf(type, retval);
-        file.release(retval, true);
-        
-        // If there's a previous, fix up its pointer
-        if (oldLast != 0) {
-            buf = file.get(oldLast);
-            pageHdr = PageHeader.getView(buf);
-            pageHdr.setNext(retval);
-            file.release(oldLast, true);
-        }
-        
-        // remove the view, we have modified the type.
-        buf.setView(null);
-        
-        return retval;
-    }
-    
-    /**
-     *  Frees a page of the indicated type.
-     */
-    void free(short type, long recid) throws IOException {
-        if (type == Magic.FREE_PAGE)
-            throw new Error("free free page?");
-        if (recid == 0)
-            throw new Error("free header page?");
-        
-        // get the page and read next and previous pointers
-        BlockIo buf = file.get(recid);
-        PageHeader pageHdr = PageHeader.getView(buf);
-        long prev = pageHdr.getPrev();
-        long next = pageHdr.getNext();
-        
-        // put the page at the front of the free list.
-        pageHdr.setType(Magic.FREE_PAGE);
-        pageHdr.setNext(header.getFirstOf(Magic.FREE_PAGE));
-        pageHdr.setPrev(0);
-        
-        header.setFirstOf(Magic.FREE_PAGE, recid);
-        file.release(recid, true);
-        
-        // remove the page from its old list
-        if (prev != 0) {
-            buf = file.get(prev);
-            pageHdr = PageHeader.getView(buf);
-            pageHdr.setNext(next);
-            file.release(prev, true);
-        }
-        else {
-            header.setFirstOf(type, next);
-        }
-        if (next != 0) {
-            buf = file.get(next);
-            pageHdr = PageHeader.getView(buf);
-            pageHdr.setPrev(prev);
-            file.release(next, true);
-        }
-        else {
-            header.setLastOf(type, prev);
-        }
-        
-    }
-    
-    
-    /**
-     *  Returns the page following the indicated block
-     */
-    long getNext(long block) throws IOException {
-        try {
-            return PageHeader.getView(file.get(block)).getNext();
-        } finally {
-            file.release(block, false);
-        }
-    }
-    
-    /**
-     *  Returns the page before the indicated block
-     */
-    long getPrev(long block) throws IOException {
-        try {
-            return PageHeader.getView(file.get(block)).getPrev();
-        } finally {
-            file.release(block, false);
-        }
-    }
-    
-    /**
-     *  Returns the first page on the indicated list.
-     */
-    long getFirst(short type) throws IOException {
-        return header.getFirstOf(type);
-    }
-
-    /**
-     *  Returns the last page on the indicated list.
-     */
-    long getLast(short type) throws IOException {
-        return header.getLastOf(type);
-    }
-    
-    
-    /**
-     *  Commit all pending (in-memory) data by flushing the page manager.
-     *  This forces a flush of all outstanding blocks (this it's an implicit
-     *  {@link RecordFile#commit} as well).
-     */
-    void commit() throws IOException {
-        // write the header out
-        file.release(headerBuf);
-        file.commit();
-
-        // and obtain it again
-        headerBuf = file.get(0);
-        header = new FileHeader(headerBuf, false);
-    }
-
-    /**
-     *  Flushes the page manager. This forces a flush of all outstanding
-     *  blocks (this it's an implicit {@link RecordFile#commit} as well).
-     */
-    void rollback() throws IOException {
-        // release header
-        file.discard(headerBuf);
-        file.rollback();
-        // and obtain it again
-        headerBuf = file.get(0);
-        if (headerBuf.readShort(0) == 0)
-            header = new FileHeader(headerBuf, true);
-        else
-            header = new FileHeader(headerBuf, false);
-    }
-    
-    /**
-     *  Closes the page manager. This flushes the page manager and releases
-     *  the lock on the header.
-     */
-    void close() throws IOException {   
-        file.release(headerBuf);
-        file.commit();
-        headerBuf = null;
-        header = null;
-        file = null;
-    }
-    
-    /**
-     *  Returns the file header.
-     */
-    FileHeader getFileHeader() {
-        return header;
-    }
-    
+  // our record file
+  private RecordFile file;
+  // header data
+  private FileHeader header;
+  private BlockIo headerBuf;
+
+  /**
+   * Creates a new page manager using the indicated record file.
+   */
+  PageManager(RecordFile file) throws IOException {
+    this.file = file;
+
+    // check the file header. If the magic is 0, we assume a new
+    // file. Note that we hold on to the file header node.
+    headerBuf = file.get(0);
+    if (headerBuf.readShort(0) == 0) {
+      header = new FileHeader(headerBuf, true);
+    } else {
+      header = new FileHeader(headerBuf, false);
+    }
+  }
+
+  /**
+   * Allocates a page of the indicated type. Returns recid of the page.
+   */
+  long allocate(short type) throws IOException {
+
+    if (type == Magic.FREE_PAGE) {
+      throw new Error("allocate of free page?");
+    }
+
+    // do we have something on the free list?
+    long retval = header.getFirstOf(Magic.FREE_PAGE);
+    boolean isNew = false;
+    if (retval != 0) {
+      // yes. Point to it and make the next of that page the
+      // new first free page.
+      header.setFirstOf(Magic.FREE_PAGE, getNext(retval));
+    } else {
+      // nope. make a new record
+      retval = header.getLastOf(Magic.FREE_PAGE);
+      if (retval == 0) {
+        // very new file - allocate record #1
+        retval = 1;
+      }
+      header.setLastOf(Magic.FREE_PAGE, retval + 1);
+      isNew = true;
+    }
+
+    // Cool. We have a record, add it to the correct list
+    BlockIo buf = file.get(retval);
+    PageHeader pageHdr = isNew ? new PageHeader(buf, type) : PageHeader
+        .getView(buf);
+    long oldLast = header.getLastOf(type);
+
+    // Clean data.
+    System.arraycopy(RecordFile.cleanData, 0, buf.getData(), 0,
+        RecordFile.BLOCK_SIZE);
+    pageHdr.setType(type);
+    pageHdr.setPrev(oldLast);
+    pageHdr.setNext(0);
+
+    if (oldLast == 0) {
+      // This was the first one of this type
+      header.setFirstOf(type, retval);
+    }
+    header.setLastOf(type, retval);
+    file.release(retval, true);
+
+    // If there's a previous, fix up its pointer
+    if (oldLast != 0) {
+      buf = file.get(oldLast);
+      pageHdr = PageHeader.getView(buf);
+      pageHdr.setNext(retval);
+      file.release(oldLast, true);
+    }
+
+    // remove the view, we have modified the type.
+    buf.setView(null);
+
+    return retval;
+  }
+
+  /**
+   * Frees a page of the indicated type.
+   */
+  void free(short type, long recid) throws IOException {
+    if (type == Magic.FREE_PAGE) {
+      throw new Error("free free page?");
+    }
+    if (recid == 0) {
+      throw new Error("free header page?");
+    }
+
+    // get the page and read next and previous pointers
+    BlockIo buf = file.get(recid);
+    PageHeader pageHdr = PageHeader.getView(buf);
+    long prev = pageHdr.getPrev();
+    long next = pageHdr.getNext();
+
+    // put the page at the front of the free list.
+    pageHdr.setType(Magic.FREE_PAGE);
+    pageHdr.setNext(header.getFirstOf(Magic.FREE_PAGE));
+    pageHdr.setPrev(0);
+
+    header.setFirstOf(Magic.FREE_PAGE, recid);
+    file.release(recid, true);
+
+    // remove the page from its old list
+    if (prev != 0) {
+      buf = file.get(prev);
+      pageHdr = PageHeader.getView(buf);
+      pageHdr.setNext(next);
+      file.release(prev, true);
+    } else {
+      header.setFirstOf(type, next);
+    }
+    if (next != 0) {
+      buf = file.get(next);
+      pageHdr = PageHeader.getView(buf);
+      pageHdr.setPrev(prev);
+      file.release(next, true);
+    } else {
+      header.setLastOf(type, prev);
+    }
+
+  }
+
+  /**
+   * Returns the page following the indicated block
+   */
+  long getNext(long block) throws IOException {
+    try {
+      return PageHeader.getView(file.get(block)).getNext();
+    } finally {
+      file.release(block, false);
+    }
+  }
+
+  /**
+   * Returns the page before the indicated block
+   */
+  long getPrev(long block) throws IOException {
+    try {
+      return PageHeader.getView(file.get(block)).getPrev();
+    } finally {
+      file.release(block, false);
+    }
+  }
+
+  /**
+   * Returns the first page on the indicated list.
+   */
+  long getFirst(short type) throws IOException {
+    return header.getFirstOf(type);
+  }
+
+  /**
+   * Returns the last page on the indicated list.
+   */
+  long getLast(short type) throws IOException {
+    return header.getLastOf(type);
+  }
+
+  /**
+   * Commit all pending (in-memory) data by flushing the page manager. This
+   * forces a flush of all outstanding blocks (this it's an implicit
+   * {@link RecordFile#commit} as well).
+   */
+  void commit() throws IOException {
+    // write the header out
+    file.release(headerBuf);
+    file.commit();
+
+    // and obtain it again
+    headerBuf = file.get(0);
+    header = new FileHeader(headerBuf, false);
+  }
+
+  /**
+   * Flushes the page manager. This forces a flush of all outstanding blocks
+   * (this it's an implicit {@link RecordFile#commit} as well).
+   */
+  void rollback() throws IOException {
+    // release header
+    file.discard(headerBuf);
+    file.rollback();
+    // and obtain it again
+    headerBuf = file.get(0);
+    if (headerBuf.readShort(0) == 0) {
+      header = new FileHeader(headerBuf, true);
+    } else {
+      header = new FileHeader(headerBuf, false);
+    }
+  }
+
+  /**
+   * Closes the page manager. This flushes the page manager and releases the
+   * lock on the header.
+   */
+  void close() throws IOException {
+    file.release(headerBuf);
+    file.commit();
+    headerBuf = null;
+    header = null;
+    file = null;
+  }
+
+  /**
+   * Returns the file header.
+   */
+  FileHeader getFileHeader() {
+    return header;
+  }
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowId.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowId.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowId.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowId.java Thu Jan 21 10:37:58 2010
@@ -66,48 +66,48 @@
 package org.apache.hadoop.hive.ql.util.jdbm.recman;
 
 /**
- *  A physical rowid is nothing else than a pointer to a physical location
- *  in a file - a (block, offset) tuple.
- *  <P>
- *  <B>Note</B>: The fact that the offset is modelled as a short limits 
- *  the block size to 32k.
+ * A physical rowid is nothing else than a pointer to a physical location in a
+ * file - a (block, offset) tuple.
+ * <P>
+ * <B>Note</B>: The fact that the offset is modelled as a short limits the block
+ * size to 32k.
  */
 class PhysicalRowId {
-    // offsets
-    private static final short O_BLOCK = 0; // long block
-    private static final short O_OFFSET = Magic.SZ_LONG; // short offset
-    static final int SIZE = O_OFFSET + Magic.SZ_SHORT;
-    
-    // my block and the position within the block
-    BlockIo block;
-    short pos;
-
-    /**
-     *  Constructs a physical rowid from the indicated data starting at
-     *  the indicated position.
-     */
-    PhysicalRowId(BlockIo block, short pos) {
-        this.block = block;
-        this.pos = pos;
-    }
-    
-    /** Returns the block number */
-    long getBlock() {
-        return block.readLong(pos + O_BLOCK);
-    }
-    
-    /** Sets the block number */
-    void setBlock(long value) {
-        block.writeLong(pos + O_BLOCK, value);
-    }
-    
-    /** Returns the offset */
-    short getOffset() {
-        return block.readShort(pos + O_OFFSET);
-    }
-    
-    /** Sets the offset */
-    void setOffset(short value) {
-        block.writeShort(pos + O_OFFSET, value);
-    }
+  // offsets
+  private static final short O_BLOCK = 0; // long block
+  private static final short O_OFFSET = Magic.SZ_LONG; // short offset
+  static final int SIZE = O_OFFSET + Magic.SZ_SHORT;
+
+  // my block and the position within the block
+  BlockIo block;
+  short pos;
+
+  /**
+   * Constructs a physical rowid from the indicated data starting at the
+   * indicated position.
+   */
+  PhysicalRowId(BlockIo block, short pos) {
+    this.block = block;
+    this.pos = pos;
+  }
+
+  /** Returns the block number */
+  long getBlock() {
+    return block.readLong(pos + O_BLOCK);
+  }
+
+  /** Sets the block number */
+  void setBlock(long value) {
+    block.writeLong(pos + O_BLOCK, value);
+  }
+
+  /** Returns the offset */
+  short getOffset() {
+    return block.readShort(pos + O_OFFSET);
+  }
+
+  /** Sets the offset */
+  void setOffset(short value) {
+    block.writeShort(pos + O_OFFSET, value);
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowIdManager.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowIdManager.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowIdManager.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowIdManager.java Thu Jan 21 10:37:58 2010
@@ -68,289 +68,267 @@
 import java.io.IOException;
 
 /**
- *  This class manages physical row ids, and their data.
+ * This class manages physical row ids, and their data.
  */
-final class PhysicalRowIdManager
-{
+final class PhysicalRowIdManager {
 
-    // The file we're talking to and the associated page manager.
-    private RecordFile file;
-    private PageManager pageman;
-    private FreePhysicalRowIdPageManager freeman;
-
-    /**
-     *  Creates a new rowid manager using the indicated record file.
-     *  and page manager.
-     */
-    PhysicalRowIdManager( RecordFile file, PageManager pageManager )
-        throws IOException
-    {
-        this.file = file;
-        this.pageman = pageManager;
-        this.freeman = new FreePhysicalRowIdPageManager(file, pageman);
-    }
-
-    /**
-     *  Inserts a new record. Returns the new physical rowid.
-     */
-    Location insert( byte[] data, int start, int length )
-        throws IOException
-    {
-        Location retval = alloc( length );
-        write( retval, data, start, length );
-        return retval;
-    }
-
-    /**
-     *  Updates an existing record. Returns the possibly changed
-     *  physical rowid.
-     */
-    Location update( Location rowid, byte[] data, int start, int length )
-        throws IOException
-    {
-        // fetch the record header
-        BlockIo block = file.get( rowid.getBlock() );
-        RecordHeader head = new RecordHeader( block, rowid.getOffset() );
-        if ( length > head.getAvailableSize() ) {
-            // not enough space - we need to copy to a new rowid.
-            file.release( block );
-            free( rowid );
-            rowid = alloc( length );
-        } else {
-            file.release( block );
-        }
-
-        // 'nuff space, write it in and return the rowid.
-        write( rowid, data, start, length );
-        return rowid;
-    }
-
-    /**
-     *  Deletes a record.
-     */
-    void delete( Location rowid )
-        throws IOException
-    {
-        free( rowid );
-    }
-
-    /**
-     *  Retrieves a record.
-     */
-    byte[] fetch( Location rowid )
-        throws IOException
-    {
-        // fetch the record header
-        PageCursor curs = new PageCursor( pageman, rowid.getBlock() );
-        BlockIo block = file.get( curs.getCurrent() );
-        RecordHeader head = new RecordHeader( block, rowid.getOffset() );
-
-        // allocate a return buffer
-        byte[] retval = new byte[ head.getCurrentSize() ];
-        if ( retval.length == 0 ) {
-            file.release( curs.getCurrent(), false );
-            return retval;
-        }
-
-        // copy bytes in
-        int offsetInBuffer = 0;
-        int leftToRead = retval.length;
-        short dataOffset = (short) (rowid.getOffset() + RecordHeader.SIZE);
-        while ( leftToRead > 0 ) {
-            // copy current page's data to return buffer
-            int toCopy = RecordFile.BLOCK_SIZE - dataOffset;
-            if ( leftToRead < toCopy ) {
-                toCopy = leftToRead;
-            }
-            System.arraycopy( block.getData(), dataOffset,
-                              retval, offsetInBuffer,
-                              toCopy );
-
-            // Go to the next block
-            leftToRead -= toCopy;
-            offsetInBuffer += toCopy;
-
-            file.release( block );
-
-            if ( leftToRead > 0 ) {
-                block = file.get( curs.next() );
-                dataOffset = DataPage.O_DATA;
-            }
-
-        }
-
-        return retval;
-    }
-
-    /**
-     *  Allocate a new rowid with the indicated size.
-     */
-    private Location alloc( int size )
-        throws IOException
-    {
-        Location retval = freeman.get( size );
-        if ( retval == null ) {
-            retval = allocNew( size, pageman.getLast( Magic.USED_PAGE ) );
-        }
-        return retval;
-    }
-
-    /**
-     *  Allocates a new rowid. The second parameter is there to
-     *  allow for a recursive call - it indicates where the search
-     *  should start.
-     */
-    private Location allocNew( int size, long start )
-        throws IOException
-    {
-        BlockIo curBlock;
-        DataPage curPage;
-        if ( start == 0 ) {
-            // we need to create a new page.
-            start = pageman.allocate( Magic.USED_PAGE );
-            curBlock = file.get( start );
-            curPage = DataPage.getDataPageView( curBlock );
-            curPage.setFirst( DataPage.O_DATA );
-            RecordHeader hdr = new RecordHeader( curBlock, DataPage.O_DATA );
-            hdr.setAvailableSize( 0 );
-            hdr.setCurrentSize( 0 );
-        } else {
-            curBlock = file.get( start );
-            curPage = DataPage.getDataPageView( curBlock );
-        }
-
-        // follow the rowids on this page to get to the last one. We don't
-        // fall off, because this is the last page, remember?
-        short pos = curPage.getFirst();
-        if ( pos == 0 ) {
-            // page is exactly filled by the last block of a record
-            file.release( curBlock );
-            return allocNew( size, 0 );
-        }
-
-        RecordHeader hdr = new RecordHeader( curBlock, pos );
-        while ( hdr.getAvailableSize() != 0 && pos < RecordFile.BLOCK_SIZE ) {
-            pos += hdr.getAvailableSize() + RecordHeader.SIZE;
-            if ( pos == RecordFile.BLOCK_SIZE ) {
-                // Again, a filled page.
-                file.release( curBlock );
-                return allocNew( size, 0 );
-            }
-
-            hdr = new RecordHeader( curBlock, pos );
-        }
-
-        if ( pos == RecordHeader.SIZE ) {
-            // the last record exactly filled the page. Restart forcing
-            // a new page.
-            file.release( curBlock );
-        }
-
-        // we have the position, now tack on extra pages until we've got
-        // enough space.
-        Location retval = new Location( start, pos );
-        int freeHere = RecordFile.BLOCK_SIZE - pos - RecordHeader.SIZE;
-        if ( freeHere < size ) {
-            // check whether the last page would have only a small bit left.
-            // if yes, increase the allocation. A small bit is a record
-            // header plus 16 bytes.
-            int lastSize = (size - freeHere) % DataPage.DATA_PER_PAGE;
-            if (( DataPage.DATA_PER_PAGE - lastSize ) < (RecordHeader.SIZE + 16) ) {
-                size += (DataPage.DATA_PER_PAGE - lastSize);
-            }
-
-            // write out the header now so we don't have to come back.
-            hdr.setAvailableSize( size );
-            file.release( start, true );
-
-            int neededLeft = size - freeHere;
-            // Refactor these two blocks!
-            while ( neededLeft >= DataPage.DATA_PER_PAGE ) {
-                start = pageman.allocate( Magic.USED_PAGE );
-                curBlock = file.get( start );
-                curPage = DataPage.getDataPageView( curBlock );
-                curPage.setFirst( (short) 0 ); // no rowids, just data
-                file.release( start, true );
-                neededLeft -= DataPage.DATA_PER_PAGE;
-            }
-            if ( neededLeft > 0 ) {
-                // done with whole chunks, allocate last fragment.
-                start = pageman.allocate( Magic.USED_PAGE );
-                curBlock = file.get( start );
-                curPage = DataPage.getDataPageView( curBlock );
-                curPage.setFirst( (short) (DataPage.O_DATA + neededLeft) );
-                file.release( start, true );
-            }
-        } else {
-            // just update the current page. If there's less than 16 bytes
-            // left, we increase the allocation (16 bytes is an arbitrary
-            // number).
-            if ( freeHere - size <= (16 + RecordHeader.SIZE) ) {
-                size = freeHere;
-            }
-            hdr.setAvailableSize( size );
-            file.release( start, true );
-        }
-        return retval;
-
-    }
-
-
-    private void free( Location id )
-        throws IOException
-    {
-        // get the rowid, and write a zero current size into it.
-        BlockIo curBlock = file.get( id.getBlock() );
-        DataPage curPage = DataPage.getDataPageView( curBlock );
-        RecordHeader hdr = new RecordHeader( curBlock, id.getOffset() );
-        hdr.setCurrentSize( 0 );
-        file.release( id.getBlock(), true );
-
-        // write the rowid to the free list
-        freeman.put( id, hdr.getAvailableSize() );
-    }
-
-    /**
-     *  Writes out data to a rowid. Assumes that any resizing has been
-     *  done.
-     */
-    private void write(Location rowid, byte[] data, int start, int length )
-        throws IOException
-    {
-        PageCursor curs = new PageCursor( pageman, rowid.getBlock() );
-        BlockIo block = file.get( curs.getCurrent() );
-        RecordHeader hdr = new RecordHeader( block, rowid.getOffset() );
-        hdr.setCurrentSize( length );
-        if ( length == 0 ) {
-            file.release( curs.getCurrent(), true );
-            return;
-        }
-
-        // copy bytes in
-        int offsetInBuffer = start;
-        int leftToWrite = length;
-        short dataOffset = (short) (rowid.getOffset() + RecordHeader.SIZE);
-        while ( leftToWrite > 0 ) {
-            // copy current page's data to return buffer
-            int toCopy = RecordFile.BLOCK_SIZE - dataOffset;
-
-            if ( leftToWrite < toCopy ) {
-                toCopy = leftToWrite;
-            }
-            System.arraycopy( data, offsetInBuffer, block.getData(),
-                              dataOffset, toCopy );
-
-            // Go to the next block
-            leftToWrite -= toCopy;
-            offsetInBuffer += toCopy;
-
-            file.release( curs.getCurrent(), true );
-
-            if ( leftToWrite > 0 ) {
-                block = file.get( curs.next() );
-                dataOffset = DataPage.O_DATA;
-            }
-        }
+  // The file we're talking to and the associated page manager.
+  private final RecordFile file;
+  private final PageManager pageman;
+  private final FreePhysicalRowIdPageManager freeman;
+
+  /**
+   * Creates a new rowid manager using the indicated record file. and page
+   * manager.
+   */
+  PhysicalRowIdManager(RecordFile file, PageManager pageManager)
+      throws IOException {
+    this.file = file;
+    pageman = pageManager;
+    freeman = new FreePhysicalRowIdPageManager(file, pageman);
+  }
+
+  /**
+   * Inserts a new record. Returns the new physical rowid.
+   */
+  Location insert(byte[] data, int start, int length) throws IOException {
+    Location retval = alloc(length);
+    write(retval, data, start, length);
+    return retval;
+  }
+
+  /**
+   * Updates an existing record. Returns the possibly changed physical rowid.
+   */
+  Location update(Location rowid, byte[] data, int start, int length)
+      throws IOException {
+    // fetch the record header
+    BlockIo block = file.get(rowid.getBlock());
+    RecordHeader head = new RecordHeader(block, rowid.getOffset());
+    if (length > head.getAvailableSize()) {
+      // not enough space - we need to copy to a new rowid.
+      file.release(block);
+      free(rowid);
+      rowid = alloc(length);
+    } else {
+      file.release(block);
+    }
+
+    // 'nuff space, write it in and return the rowid.
+    write(rowid, data, start, length);
+    return rowid;
+  }
+
+  /**
+   * Deletes a record.
+   */
+  void delete(Location rowid) throws IOException {
+    free(rowid);
+  }
+
+  /**
+   * Retrieves a record.
+   */
+  byte[] fetch(Location rowid) throws IOException {
+    // fetch the record header
+    PageCursor curs = new PageCursor(pageman, rowid.getBlock());
+    BlockIo block = file.get(curs.getCurrent());
+    RecordHeader head = new RecordHeader(block, rowid.getOffset());
+
+    // allocate a return buffer
+    byte[] retval = new byte[head.getCurrentSize()];
+    if (retval.length == 0) {
+      file.release(curs.getCurrent(), false);
+      return retval;
+    }
+
+    // copy bytes in
+    int offsetInBuffer = 0;
+    int leftToRead = retval.length;
+    short dataOffset = (short) (rowid.getOffset() + RecordHeader.SIZE);
+    while (leftToRead > 0) {
+      // copy current page's data to return buffer
+      int toCopy = RecordFile.BLOCK_SIZE - dataOffset;
+      if (leftToRead < toCopy) {
+        toCopy = leftToRead;
+      }
+      System.arraycopy(block.getData(), dataOffset, retval, offsetInBuffer,
+          toCopy);
+
+      // Go to the next block
+      leftToRead -= toCopy;
+      offsetInBuffer += toCopy;
+
+      file.release(block);
+
+      if (leftToRead > 0) {
+        block = file.get(curs.next());
+        dataOffset = DataPage.O_DATA;
+      }
+
+    }
+
+    return retval;
+  }
+
+  /**
+   * Allocate a new rowid with the indicated size.
+   */
+  private Location alloc(int size) throws IOException {
+    Location retval = freeman.get(size);
+    if (retval == null) {
+      retval = allocNew(size, pageman.getLast(Magic.USED_PAGE));
+    }
+    return retval;
+  }
+
+  /**
+   * Allocates a new rowid. The second parameter is there to allow for a
+   * recursive call - it indicates where the search should start.
+   */
+  private Location allocNew(int size, long start) throws IOException {
+    BlockIo curBlock;
+    DataPage curPage;
+    if (start == 0) {
+      // we need to create a new page.
+      start = pageman.allocate(Magic.USED_PAGE);
+      curBlock = file.get(start);
+      curPage = DataPage.getDataPageView(curBlock);
+      curPage.setFirst(DataPage.O_DATA);
+      RecordHeader hdr = new RecordHeader(curBlock, DataPage.O_DATA);
+      hdr.setAvailableSize(0);
+      hdr.setCurrentSize(0);
+    } else {
+      curBlock = file.get(start);
+      curPage = DataPage.getDataPageView(curBlock);
+    }
+
+    // follow the rowids on this page to get to the last one. We don't
+    // fall off, because this is the last page, remember?
+    short pos = curPage.getFirst();
+    if (pos == 0) {
+      // page is exactly filled by the last block of a record
+      file.release(curBlock);
+      return allocNew(size, 0);
     }
-}
 
+    RecordHeader hdr = new RecordHeader(curBlock, pos);
+    while (hdr.getAvailableSize() != 0 && pos < RecordFile.BLOCK_SIZE) {
+      pos += hdr.getAvailableSize() + RecordHeader.SIZE;
+      if (pos == RecordFile.BLOCK_SIZE) {
+        // Again, a filled page.
+        file.release(curBlock);
+        return allocNew(size, 0);
+      }
+
+      hdr = new RecordHeader(curBlock, pos);
+    }
+
+    if (pos == RecordHeader.SIZE) {
+      // the last record exactly filled the page. Restart forcing
+      // a new page.
+      file.release(curBlock);
+    }
+
+    // we have the position, now tack on extra pages until we've got
+    // enough space.
+    Location retval = new Location(start, pos);
+    int freeHere = RecordFile.BLOCK_SIZE - pos - RecordHeader.SIZE;
+    if (freeHere < size) {
+      // check whether the last page would have only a small bit left.
+      // if yes, increase the allocation. A small bit is a record
+      // header plus 16 bytes.
+      int lastSize = (size - freeHere) % DataPage.DATA_PER_PAGE;
+      if ((DataPage.DATA_PER_PAGE - lastSize) < (RecordHeader.SIZE + 16)) {
+        size += (DataPage.DATA_PER_PAGE - lastSize);
+      }
+
+      // write out the header now so we don't have to come back.
+      hdr.setAvailableSize(size);
+      file.release(start, true);
+
+      int neededLeft = size - freeHere;
+      // Refactor these two blocks!
+      while (neededLeft >= DataPage.DATA_PER_PAGE) {
+        start = pageman.allocate(Magic.USED_PAGE);
+        curBlock = file.get(start);
+        curPage = DataPage.getDataPageView(curBlock);
+        curPage.setFirst((short) 0); // no rowids, just data
+        file.release(start, true);
+        neededLeft -= DataPage.DATA_PER_PAGE;
+      }
+      if (neededLeft > 0) {
+        // done with whole chunks, allocate last fragment.
+        start = pageman.allocate(Magic.USED_PAGE);
+        curBlock = file.get(start);
+        curPage = DataPage.getDataPageView(curBlock);
+        curPage.setFirst((short) (DataPage.O_DATA + neededLeft));
+        file.release(start, true);
+      }
+    } else {
+      // just update the current page. If there's less than 16 bytes
+      // left, we increase the allocation (16 bytes is an arbitrary
+      // number).
+      if (freeHere - size <= (16 + RecordHeader.SIZE)) {
+        size = freeHere;
+      }
+      hdr.setAvailableSize(size);
+      file.release(start, true);
+    }
+    return retval;
+
+  }
+
+  private void free(Location id) throws IOException {
+    // get the rowid, and write a zero current size into it.
+    BlockIo curBlock = file.get(id.getBlock());
+    DataPage.getDataPageView(curBlock);
+    RecordHeader hdr = new RecordHeader(curBlock, id.getOffset());
+    hdr.setCurrentSize(0);
+    file.release(id.getBlock(), true);
+
+    // write the rowid to the free list
+    freeman.put(id, hdr.getAvailableSize());
+  }
+
+  /**
+   * Writes out data to a rowid. Assumes that any resizing has been done.
+   */
+  private void write(Location rowid, byte[] data, int start, int length)
+      throws IOException {
+    PageCursor curs = new PageCursor(pageman, rowid.getBlock());
+    BlockIo block = file.get(curs.getCurrent());
+    RecordHeader hdr = new RecordHeader(block, rowid.getOffset());
+    hdr.setCurrentSize(length);
+    if (length == 0) {
+      file.release(curs.getCurrent(), true);
+      return;
+    }
+
+    // copy bytes in
+    int offsetInBuffer = start;
+    int leftToWrite = length;
+    short dataOffset = (short) (rowid.getOffset() + RecordHeader.SIZE);
+    while (leftToWrite > 0) {
+      // copy current page's data to return buffer
+      int toCopy = RecordFile.BLOCK_SIZE - dataOffset;
+
+      if (leftToWrite < toCopy) {
+        toCopy = leftToWrite;
+      }
+      System.arraycopy(data, offsetInBuffer, block.getData(), dataOffset,
+          toCopy);
+
+      // Go to the next block
+      leftToWrite -= toCopy;
+      offsetInBuffer += toCopy;
+
+      file.release(curs.getCurrent(), true);
+
+      if (leftToWrite > 0) {
+        block = file.get(curs.next());
+        dataOffset = DataPage.O_DATA;
+      }
+    }
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java Thu Jan 21 10:37:58 2010
@@ -66,86 +66,84 @@
 
 package org.apache.hadoop.hive.ql.util.jdbm.recman;
 
-import java.io.IOException;
 import java.io.File;
+import java.io.IOException;
 import java.util.Properties;
 
 import org.apache.hadoop.hive.ql.util.jdbm.RecordManager;
 import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerOptions;
 import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerProvider;
-
 import org.apache.hadoop.hive.ql.util.jdbm.helper.MRU;
 
 /**
  * Provider of the default RecordManager implementation.
- *
+ * 
  * @author <a href="mailto:boisvert@intalio.com">Alex Boisvert</a>
  * @version $Id: Provider.java,v 1.3 2005/06/25 23:12:32 doomdark Exp $
  */
-public final class Provider
-    implements RecordManagerProvider
-{
-
-    /**
-     * Create a default implementation record manager.
-     *
-     * @param name Name of the record file.
-     * @param options Record manager options.
-     * @throws IOException if an I/O related exception occurs while creating
-     *                    or opening the record manager.
-     * @throws UnsupportedOperationException if some options are not supported by the
-     *                                      implementation.
-     * @throws IllegalArgumentException if some options are invalid.
-     */
-    public RecordManager createRecordManager( String name,
-                                              Properties options )
-        throws IOException
-    {
-        RecordManager  recman;
-
-        recman = new BaseRecordManager( name );
-        recman = getCachedRecordManager(recman, options);
-        return recman;
-    }
-    
-    private RecordManager getCachedRecordManager(RecordManager recman, Properties options)
-    {
-        String         value;
-        int            cacheSize;
-        
-        value = options.getProperty( RecordManagerOptions.DISABLE_TRANSACTIONS, "false" );
-        if ( value.equalsIgnoreCase( "TRUE" ) ) {
-            ( (BaseRecordManager) recman ).disableTransactions();
-        }
-
-        value = options.getProperty( RecordManagerOptions.CACHE_SIZE, "1000" );
-        cacheSize = Integer.parseInt( value );
-
-        value = options.getProperty( RecordManagerOptions.CACHE_TYPE,
-                                     RecordManagerOptions.NORMAL_CACHE );
-        if ( value.equalsIgnoreCase( RecordManagerOptions.NORMAL_CACHE ) ) {
-            MRU cache = new MRU( cacheSize );
-            recman = new CacheRecordManager( recman, cache );
-        } else if ( value.equalsIgnoreCase( RecordManagerOptions.SOFT_REF_CACHE ) ) {
-            throw new IllegalArgumentException( "Soft reference cache not implemented" );
-        } else if ( value.equalsIgnoreCase( RecordManagerOptions.WEAK_REF_CACHE ) ) {
-            throw new IllegalArgumentException( "Weak reference cache not implemented" );
-        } else if ( value.equalsIgnoreCase(RecordManagerOptions.NO_CACHE) ){
-          // do nothing
-        } else {
-            throw new IllegalArgumentException( "Invalid cache type: " + value );
-        } 
+public final class Provider implements RecordManagerProvider {
 
-        return recman;
+  /**
+   * Create a default implementation record manager.
+   * 
+   * @param name
+   *          Name of the record file.
+   * @param options
+   *          Record manager options.
+   * @throws IOException
+   *           if an I/O related exception occurs while creating or opening the
+   *           record manager.
+   * @throws UnsupportedOperationException
+   *           if some options are not supported by the implementation.
+   * @throws IllegalArgumentException
+   *           if some options are invalid.
+   */
+  public RecordManager createRecordManager(String name, Properties options)
+      throws IOException {
+    RecordManager recman;
+
+    recman = new BaseRecordManager(name);
+    recman = getCachedRecordManager(recman, options);
+    return recman;
+  }
+
+  private RecordManager getCachedRecordManager(RecordManager recman,
+      Properties options) {
+    String value;
+    int cacheSize;
+
+    value = options.getProperty(RecordManagerOptions.DISABLE_TRANSACTIONS,
+        "false");
+    if (value.equalsIgnoreCase("TRUE")) {
+      ((BaseRecordManager) recman).disableTransactions();
     }
 
-    public RecordManager createRecordManager ( File file, 
-                                              Properties options )
-        throws IOException
-    {
-      RecordManager recman = new BaseRecordManager(file);
-      recman = getCachedRecordManager(recman, options);
-      return recman;
+    value = options.getProperty(RecordManagerOptions.CACHE_SIZE, "1000");
+    cacheSize = Integer.parseInt(value);
+
+    value = options.getProperty(RecordManagerOptions.CACHE_TYPE,
+        RecordManagerOptions.NORMAL_CACHE);
+    if (value.equalsIgnoreCase(RecordManagerOptions.NORMAL_CACHE)) {
+      MRU cache = new MRU(cacheSize);
+      recman = new CacheRecordManager(recman, cache);
+    } else if (value.equalsIgnoreCase(RecordManagerOptions.SOFT_REF_CACHE)) {
+      throw new IllegalArgumentException("Soft reference cache not implemented");
+    } else if (value.equalsIgnoreCase(RecordManagerOptions.WEAK_REF_CACHE)) {
+      throw new IllegalArgumentException("Weak reference cache not implemented");
+    } else if (value.equalsIgnoreCase(RecordManagerOptions.NO_CACHE)) {
+      // do nothing
+    } else {
+      throw new IllegalArgumentException("Invalid cache type: " + value);
     }
 
+    return recman;
+  }
+
+  public RecordManager createRecordManager(File file, Properties options)
+      throws IOException {
+    RecordManager recman = new BaseRecordManager(file);
+    recman = getCachedRecordManager(recman, options);
+    return recman;
+  }
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordCache.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordCache.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordCache.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordCache.java Thu Jan 21 10:37:58 2010
@@ -68,31 +68,31 @@
 import java.io.IOException;
 
 /**
- *  This interface is used for synchronization.
- *  <p>
- *  RecordManager ensures that the cache has the up-to-date information
- *  by way of an invalidation protocol.
+ * This interface is used for synchronization.
+ * <p>
+ * RecordManager ensures that the cache has the up-to-date information by way of
+ * an invalidation protocol.
  */
 public interface RecordCache {
 
-    /**
-     * Notification to flush content related to a given record.
-     */
-    public void flush(long recid) throws IOException;
-
-    /**
-     * Notification to flush data all of records.
-     */
-    public void flushAll() throws IOException;
-
-    /**
-     * Notification to invalidate content related to given record.
-     */
-    public void invalidate(long recid) throws IOException;
-
-    /**
-     * Notification to invalidate content of all records.
-     */
-    public void invalidateAll() throws IOException;
+  /**
+   * Notification to flush content related to a given record.
+   */
+  public void flush(long recid) throws IOException;
+
+  /**
+   * Notification to flush data all of records.
+   */
+  public void flushAll() throws IOException;
+
+  /**
+   * Notification to invalidate content related to given record.
+   */
+  public void invalidate(long recid) throws IOException;
+
+  /**
+   * Notification to invalidate content of all records.
+   */
+  public void invalidateAll() throws IOException;
 
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java Thu Jan 21 10:37:58 2010
@@ -65,384 +65,389 @@
 
 package org.apache.hadoop.hive.ql.util.jdbm.recman;
 
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 
 /**
- *  This class represents a random access file as a set of fixed size
- *  records. Each record has a physical record number, and records are
- *  cached in order to improve access.
+ * This class represents a random access file as a set of fixed size records.
+ * Each record has a physical record number, and records are cached in order to
+ * improve access.
  *<p>
- *  The set of dirty records on the in-use list constitutes a transaction.
- *  Later on, we will send these records to some recovery thingy.
+ * The set of dirty records on the in-use list constitutes a transaction. Later
+ * on, we will send these records to some recovery thingy.
  */
 public final class RecordFile {
-    final TransactionManager txnMgr;
+  final TransactionManager txnMgr;
 
-    // Todo: reorganize in hashes and fifos as necessary.
-    // free -> inUse -> dirty -> inTxn -> free
-    // free is a cache, thus a FIFO. The rest are hashes.
-    private final LinkedList free = new LinkedList();
-    private final HashMap inUse = new HashMap();
-    private final HashMap dirty = new HashMap();
-    private final HashMap inTxn = new HashMap();
-
-    // transactions disabled?
-    private boolean transactionsDisabled = false;
-
-    /** The length of a single block. */
-    public final static int BLOCK_SIZE = 8192;//4096;
-
-    /** The extension of a record file */
-    final static String extension = ".db";
-
-    /** A block of clean data to wipe clean pages. */
-    final static byte[] cleanData = new byte[BLOCK_SIZE];
-
-    private RandomAccessFile file;
-    private final String fileName;
-
-    /**
-     *  Creates a new object on the indicated filename. The file is
-     *  opened in read/write mode.
-     *
-     *  @param fileName the name of the file to open or create, without
-     *         an extension.
-     *  @throws IOException whenever the creation of the underlying
-     *          RandomAccessFile throws it.
-     */
-    RecordFile(String fileName) throws IOException {
-        this.fileName = fileName;
-        file = new RandomAccessFile(fileName + extension, "rw");
-        txnMgr = new TransactionManager(this);
-    }
-    
-    /**
-     *  Creates a new object on the indicated filename. The file is
-     *  opened in read/write mode.
-     *
-     *  @param fileName the name of the file to open or create, without
-     *         an extension.
-     *  @throws IOException whenever the creation of the underlying
-     *          RandomAccessFile throws it.
-     */
-    RecordFile(File file) throws IOException {
-        this.fileName = file.getName();
-        this.file = new RandomAccessFile(file, "rw");
-        txnMgr = new TransactionManager(this);
-    }
-
-    /**
-     *  Returns the file name.
-     */
-    String getFileName() {
-        return fileName;
-    }
-
-    /**
-     *  Disables transactions: doesn't sync and doesn't use the
-     *  transaction manager.
-     */
-    void disableTransactions() {
-        transactionsDisabled = true;
-    }
-
-    /**
-     *  Gets a block from the file. The returned byte array is
-     *  the in-memory copy of the record, and thus can be written
-     *  (and subsequently released with a dirty flag in order to
-     *  write the block back).
-     *
-     *  @param blockid The record number to retrieve.
-     */
-     BlockIo get(long blockid) throws IOException {
-         Long key = new Long(blockid);
-
-         // try in transaction list, dirty list, free list
-         BlockIo node = (BlockIo) inTxn.get(key);
-         if (node != null) {
-             inTxn.remove(key);
-             inUse.put(key, node);
-             return node;
-         }
-         node = (BlockIo) dirty.get(key);
-         if (node != null) {
-             dirty.remove(key);
-             inUse.put(key, node);
-             return node;
-         }
-         for (Iterator i = free.iterator(); i.hasNext(); ) {
-             BlockIo cur = (BlockIo) i.next();
-             if (cur.getBlockId() == blockid) {
-                 node = cur;
-                 i.remove();
-                 inUse.put(key, node);
-                 return node;
-             }
-         }
-
-         // sanity check: can't be on in use list
-         if (inUse.get(key) != null) {
-             throw new Error("double get for block " + blockid);
-         }
-
-         // get a new node and read it from the file
-         node = getNewNode(blockid);
-         long offset = blockid * BLOCK_SIZE;
-         if (file.length() > 0 && offset <= file.length()) {
-             read(file, offset, node.getData(), BLOCK_SIZE);
-         } else {
-             System.arraycopy(cleanData, 0, node.getData(), 0, BLOCK_SIZE);
-         }
-         inUse.put(key, node);
-         node.setClean();
-         return node;
-     }
-
-
-    /**
-     *  Releases a block.
-     *
-     *  @param blockid The record number to release.
-     *  @param isDirty If true, the block was modified since the get().
-     */
-    void release(long blockid, boolean isDirty)
-    throws IOException {
-        BlockIo node = (BlockIo) inUse.get(new Long(blockid));
-        if (node == null)
-            throw new IOException("bad blockid " + blockid + " on release");
-        if (!node.isDirty() && isDirty)
-            node.setDirty();
-        release(node);
-    }
-
-    /**
-     *  Releases a block.
-     *
-     *  @param block The block to release.
-     */
-    void release(BlockIo block) {
-        Long key = new Long(block.getBlockId());
-        inUse.remove(key);
-        if (block.isDirty()) {
-            // System.out.println( "Dirty: " + key + block );
-            dirty.put(key, block);
-        } else {
-            if (!transactionsDisabled && block.isInTransaction()) {
-                inTxn.put(key, block);
-            } else {
-                free.add(block);
-            }
-        }
-    }
-
-    /**
-     *  Discards a block (will not write the block even if it's dirty)
-     *
-     *  @param block The block to discard.
-     */
-    void discard(BlockIo block) {
-        Long key = new Long(block.getBlockId());
-        inUse.remove(key);
-
-        // note: block not added to free list on purpose, because
-        //       it's considered invalid
-    }
-
-    /**
-     *  Commits the current transaction by flushing all dirty buffers
-     *  to disk.
-     */
-    void commit() throws IOException {
-        // debugging...
-        if (!inUse.isEmpty() && inUse.size() > 1) {
-            showList(inUse.values().iterator());
-            throw new Error("in use list not empty at commit time ("
-                            + inUse.size() + ")");
-        }
-
-        //  System.out.println("committing...");
-
-        if ( dirty.size() == 0 ) {
-            // if no dirty blocks, skip commit process
-            return;
-        }
-
-        if (!transactionsDisabled) {
-            txnMgr.start();
-        }
-
-        for (Iterator i = dirty.values().iterator(); i.hasNext(); ) {
-            BlockIo node = (BlockIo) i.next();
-            i.remove();
-            // System.out.println("node " + node + " map size now " + dirty.size());
-            if (transactionsDisabled) {
-                long offset = node.getBlockId() * BLOCK_SIZE;
-                file.seek(offset);
-                file.write(node.getData());
-                node.setClean();
-                free.add(node);
-            }
-            else {
-                txnMgr.add(node);
-                inTxn.put(new Long(node.getBlockId()), node);
-            }
-        }
-        if (!transactionsDisabled) {
-            txnMgr.commit();
-        }
-    }
-
-    /**
-     *  Rollback the current transaction by discarding all dirty buffers
-     */
-    void rollback() throws IOException {
-        // debugging...
-        if (!inUse.isEmpty()) {
-            showList(inUse.values().iterator());
-            throw new Error("in use list not empty at rollback time ("
-                            + inUse.size() + ")");
-        }
-        //  System.out.println("rollback...");
-        dirty.clear();
-
-        txnMgr.synchronizeLogFromDisk();
-
-        if (!inTxn.isEmpty()) {
-            showList(inTxn.values().iterator());
-            throw new Error("in txn list not empty at rollback time ("
-                            + inTxn.size() + ")");
-        };
-    }
-
-    /**
-     *  Commits and closes file.
-     */
-    void close() throws IOException {
-        if (!dirty.isEmpty()) {
-            commit();
-        }
-        txnMgr.shutdown();
-        if ( transactionsDisabled ) {
-          txnMgr.removeLogFile();
-        }
-
-        if (!inTxn.isEmpty()) {
-            showList(inTxn.values().iterator());
-            throw new Error("In transaction not empty");
-        }
-
-        // these actually ain't that bad in a production release
-        if (!dirty.isEmpty()) {
-            System.out.println("ERROR: dirty blocks at close time");
-            showList(dirty.values().iterator());
-            throw new Error("Dirty blocks at close time");
-        }
-        if (!inUse.isEmpty()) {
-            System.out.println("ERROR: inUse blocks at close time");
-            showList(inUse.values().iterator());
-            throw new Error("inUse blocks at close time");
-        }
-
-        // debugging stuff to keep an eye on the free list
-        // System.out.println("Free list size:" + free.size());
-        file.close();
-        file = null;
-    }
-
-
-    /**
-     * Force closing the file and underlying transaction manager.
-     * Used for testing purposed only.
-     */
-    void forceClose() throws IOException {
-      txnMgr.forceClose();
-      file.close();
-    }
-
-    /**
-     *  Prints contents of a list
-     */
-    private void showList(Iterator i) {
-        int cnt = 0;
-        while (i.hasNext()) {
-            System.out.println("elem " + cnt + ": " + i.next());
-            cnt++;
-        }
-    }
-
-
-    /**
-     *  Returns a new node. The node is retrieved (and removed)
-     *  from the released list or created new.
-     */
-    private BlockIo getNewNode(long blockid)
-    throws IOException {
-
-        BlockIo retval = null;
-        if (!free.isEmpty()) {
-            retval = (BlockIo) free.removeFirst();
-        }
-        if (retval == null)
-            retval = new BlockIo(0, new byte[BLOCK_SIZE]);
-
-        retval.setBlockId(blockid);
-        retval.setView(null);
-        return retval;
-    }
-
-    /**
-     *  Synchs a node to disk. This is called by the transaction manager's
-     *  synchronization code.
-     */
-    void synch(BlockIo node) throws IOException {
-        byte[] data = node.getData();
-        if (data != null) {
-            long offset = node.getBlockId() * BLOCK_SIZE;
-            file.seek(offset);
-            file.write(data);
-        }
-    }
-
-    /**
-     *  Releases a node from the transaction list, if it was sitting
-     *  there.
-     *
-     *  @param recycle true if block data can be reused
-     */
-    void releaseFromTransaction(BlockIo node, boolean recycle)
-    throws IOException {
-        Long key = new Long(node.getBlockId());
-        if ((inTxn.remove(key) != null) && recycle) {
-            free.add(node);
-        }
-    }
-
-    /**
-     *  Synchronizes the file.
-     */
-    void sync() throws IOException {
-        file.getFD().sync();
-    }
-
-
-    /**
-     * Utility method: Read a block from a RandomAccessFile
-     */
-    private static void read(RandomAccessFile file, long offset,
-                             byte[] buffer, int nBytes) throws IOException {
+  // Todo: reorganize in hashes and fifos as necessary.
+  // free -> inUse -> dirty -> inTxn -> free
+  // free is a cache, thus a FIFO. The rest are hashes.
+  private final LinkedList free = new LinkedList();
+  private final HashMap inUse = new HashMap();
+  private final HashMap dirty = new HashMap();
+  private final HashMap inTxn = new HashMap();
+
+  // transactions disabled?
+  private boolean transactionsDisabled = false;
+
+  /** The length of a single block. */
+  public final static int BLOCK_SIZE = 8192;// 4096;
+
+  /** The extension of a record file */
+  final static String extension = ".db";
+
+  /** A block of clean data to wipe clean pages. */
+  final static byte[] cleanData = new byte[BLOCK_SIZE];
+
+  private RandomAccessFile file;
+  private final String fileName;
+
+  /**
+   * Creates a new object on the indicated filename. The file is opened in
+   * read/write mode.
+   * 
+   * @param fileName
+   *          the name of the file to open or create, without an extension.
+   * @throws IOException
+   *           whenever the creation of the underlying RandomAccessFile throws
+   *           it.
+   */
+  RecordFile(String fileName) throws IOException {
+    this.fileName = fileName;
+    file = new RandomAccessFile(fileName + extension, "rw");
+    txnMgr = new TransactionManager(this);
+  }
+
+  /**
+   * Creates a new object on the indicated filename. The file is opened in
+   * read/write mode.
+   * 
+   * @param fileName
+   *          the name of the file to open or create, without an extension.
+   * @throws IOException
+   *           whenever the creation of the underlying RandomAccessFile throws
+   *           it.
+   */
+  RecordFile(File file) throws IOException {
+    fileName = file.getName();
+    this.file = new RandomAccessFile(file, "rw");
+    txnMgr = new TransactionManager(this);
+  }
+
+  /**
+   * Returns the file name.
+   */
+  String getFileName() {
+    return fileName;
+  }
+
+  /**
+   * Disables transactions: doesn't sync and doesn't use the transaction
+   * manager.
+   */
+  void disableTransactions() {
+    transactionsDisabled = true;
+  }
+
+  /**
+   * Gets a block from the file. The returned byte array is the in-memory copy
+   * of the record, and thus can be written (and subsequently released with a
+   * dirty flag in order to write the block back).
+   * 
+   * @param blockid
+   *          The record number to retrieve.
+   */
+  BlockIo get(long blockid) throws IOException {
+    Long key = new Long(blockid);
+
+    // try in transaction list, dirty list, free list
+    BlockIo node = (BlockIo) inTxn.get(key);
+    if (node != null) {
+      inTxn.remove(key);
+      inUse.put(key, node);
+      return node;
+    }
+    node = (BlockIo) dirty.get(key);
+    if (node != null) {
+      dirty.remove(key);
+      inUse.put(key, node);
+      return node;
+    }
+    for (Iterator i = free.iterator(); i.hasNext();) {
+      BlockIo cur = (BlockIo) i.next();
+      if (cur.getBlockId() == blockid) {
+        node = cur;
+        i.remove();
+        inUse.put(key, node);
+        return node;
+      }
+    }
+
+    // sanity check: can't be on in use list
+    if (inUse.get(key) != null) {
+      throw new Error("double get for block " + blockid);
+    }
+
+    // get a new node and read it from the file
+    node = getNewNode(blockid);
+    long offset = blockid * BLOCK_SIZE;
+    if (file.length() > 0 && offset <= file.length()) {
+      read(file, offset, node.getData(), BLOCK_SIZE);
+    } else {
+      System.arraycopy(cleanData, 0, node.getData(), 0, BLOCK_SIZE);
+    }
+    inUse.put(key, node);
+    node.setClean();
+    return node;
+  }
+
+  /**
+   * Releases a block.
+   * 
+   * @param blockid
+   *          The record number to release.
+   * @param isDirty
+   *          If true, the block was modified since the get().
+   */
+  void release(long blockid, boolean isDirty) throws IOException {
+    BlockIo node = (BlockIo) inUse.get(new Long(blockid));
+    if (node == null) {
+      throw new IOException("bad blockid " + blockid + " on release");
+    }
+    if (!node.isDirty() && isDirty) {
+      node.setDirty();
+    }
+    release(node);
+  }
+
+  /**
+   * Releases a block.
+   * 
+   * @param block
+   *          The block to release.
+   */
+  void release(BlockIo block) {
+    Long key = new Long(block.getBlockId());
+    inUse.remove(key);
+    if (block.isDirty()) {
+      // System.out.println( "Dirty: " + key + block );
+      dirty.put(key, block);
+    } else {
+      if (!transactionsDisabled && block.isInTransaction()) {
+        inTxn.put(key, block);
+      } else {
+        free.add(block);
+      }
+    }
+  }
+
+  /**
+   * Discards a block (will not write the block even if it's dirty)
+   * 
+   * @param block
+   *          The block to discard.
+   */
+  void discard(BlockIo block) {
+    Long key = new Long(block.getBlockId());
+    inUse.remove(key);
+
+    // note: block not added to free list on purpose, because
+    // it's considered invalid
+  }
+
+  /**
+   * Commits the current transaction by flushing all dirty buffers to disk.
+   */
+  void commit() throws IOException {
+    // debugging...
+    if (!inUse.isEmpty() && inUse.size() > 1) {
+      showList(inUse.values().iterator());
+      throw new Error("in use list not empty at commit time (" + inUse.size()
+          + ")");
+    }
+
+    // System.out.println("committing...");
+
+    if (dirty.size() == 0) {
+      // if no dirty blocks, skip commit process
+      return;
+    }
+
+    if (!transactionsDisabled) {
+      txnMgr.start();
+    }
+
+    for (Iterator i = dirty.values().iterator(); i.hasNext();) {
+      BlockIo node = (BlockIo) i.next();
+      i.remove();
+      // System.out.println("node " + node + " map size now " + dirty.size());
+      if (transactionsDisabled) {
+        long offset = node.getBlockId() * BLOCK_SIZE;
         file.seek(offset);
-        int remaining = nBytes;
-        int pos = 0;
-        while (remaining > 0) {
-            int read = file.read(buffer, pos, remaining);
-            if (read == -1) {
-                System.arraycopy(cleanData, 0, buffer, pos, remaining);
-                break;
-            }
-            remaining -= read;
-            pos += read;
-        }
+        file.write(node.getData());
+        node.setClean();
+        free.add(node);
+      } else {
+        txnMgr.add(node);
+        inTxn.put(new Long(node.getBlockId()), node);
+      }
+    }
+    if (!transactionsDisabled) {
+      txnMgr.commit();
+    }
+  }
+
+  /**
+   * Rollback the current transaction by discarding all dirty buffers
+   */
+  void rollback() throws IOException {
+    // debugging...
+    if (!inUse.isEmpty()) {
+      showList(inUse.values().iterator());
+      throw new Error("in use list not empty at rollback time (" + inUse.size()
+          + ")");
+    }
+    // System.out.println("rollback...");
+    dirty.clear();
+
+    txnMgr.synchronizeLogFromDisk();
+
+    if (!inTxn.isEmpty()) {
+      showList(inTxn.values().iterator());
+      throw new Error("in txn list not empty at rollback time (" + inTxn.size()
+          + ")");
+    }
+    ;
+  }
+
+  /**
+   * Commits and closes file.
+   */
+  void close() throws IOException {
+    if (!dirty.isEmpty()) {
+      commit();
+    }
+    txnMgr.shutdown();
+    if (transactionsDisabled) {
+      txnMgr.removeLogFile();
+    }
+
+    if (!inTxn.isEmpty()) {
+      showList(inTxn.values().iterator());
+      throw new Error("In transaction not empty");
+    }
+
+    // these actually ain't that bad in a production release
+    if (!dirty.isEmpty()) {
+      System.out.println("ERROR: dirty blocks at close time");
+      showList(dirty.values().iterator());
+      throw new Error("Dirty blocks at close time");
+    }
+    if (!inUse.isEmpty()) {
+      System.out.println("ERROR: inUse blocks at close time");
+      showList(inUse.values().iterator());
+      throw new Error("inUse blocks at close time");
+    }
+
+    // debugging stuff to keep an eye on the free list
+    // System.out.println("Free list size:" + free.size());
+    file.close();
+    file = null;
+  }
+
+  /**
+   * Force closing the file and underlying transaction manager. Used for testing
+   * purposed only.
+   */
+  void forceClose() throws IOException {
+    txnMgr.forceClose();
+    file.close();
+  }
+
+  /**
+   * Prints contents of a list
+   */
+  private void showList(Iterator i) {
+    int cnt = 0;
+    while (i.hasNext()) {
+      System.out.println("elem " + cnt + ": " + i.next());
+      cnt++;
+    }
+  }
+
+  /**
+   * Returns a new node. The node is retrieved (and removed) from the released
+   * list or created new.
+   */
+  private BlockIo getNewNode(long blockid) throws IOException {
+
+    BlockIo retval = null;
+    if (!free.isEmpty()) {
+      retval = (BlockIo) free.removeFirst();
+    }
+    if (retval == null) {
+      retval = new BlockIo(0, new byte[BLOCK_SIZE]);
+    }
+
+    retval.setBlockId(blockid);
+    retval.setView(null);
+    return retval;
+  }
+
+  /**
+   * Synchs a node to disk. This is called by the transaction manager's
+   * synchronization code.
+   */
+  void synch(BlockIo node) throws IOException {
+    byte[] data = node.getData();
+    if (data != null) {
+      long offset = node.getBlockId() * BLOCK_SIZE;
+      file.seek(offset);
+      file.write(data);
+    }
+  }
+
+  /**
+   * Releases a node from the transaction list, if it was sitting there.
+   * 
+   * @param recycle
+   *          true if block data can be reused
+   */
+  void releaseFromTransaction(BlockIo node, boolean recycle) throws IOException {
+    Long key = new Long(node.getBlockId());
+    if ((inTxn.remove(key) != null) && recycle) {
+      free.add(node);
+    }
+  }
+
+  /**
+   * Synchronizes the file.
+   */
+  void sync() throws IOException {
+    file.getFD().sync();
+  }
+
+  /**
+   * Utility method: Read a block from a RandomAccessFile
+   */
+  private static void read(RandomAccessFile file, long offset, byte[] buffer,
+      int nBytes) throws IOException {
+    file.seek(offset);
+    int remaining = nBytes;
+    int pos = 0;
+    while (remaining > 0) {
+      int read = file.read(buffer, pos, remaining);
+      if (read == -1) {
+        System.arraycopy(cleanData, 0, buffer, pos, remaining);
+        break;
+      }
+      remaining -= read;
+      pos += read;
     }
+  }
 
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordHeader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordHeader.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordHeader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordHeader.java Thu Jan 21 10:37:58 2010
@@ -66,60 +66,59 @@
 package org.apache.hadoop.hive.ql.util.jdbm.recman;
 
 /**
- *  The data that comes at the start of a record of data. It stores 
- *  both the current size and the avaliable size for the record - the latter
- *  can be bigger than the former, which allows the record to grow without
- *  needing to be moved and which allows the system to put small records
- *  in larger free spots.
+ * The data that comes at the start of a record of data. It stores both the
+ * current size and the avaliable size for the record - the latter can be bigger
+ * than the former, which allows the record to grow without needing to be moved
+ * and which allows the system to put small records in larger free spots.
  */
 class RecordHeader {
-    // offsets
-    private static final short O_CURRENTSIZE = 0; // int currentSize
-    private static final short O_AVAILABLESIZE = Magic.SZ_INT; // int availableSize
-    static final int SIZE = O_AVAILABLESIZE + Magic.SZ_INT;
-    
-    // my block and the position within the block
-    private BlockIo block;
-    private short pos;
-
-    /**
-     *  Constructs a record header from the indicated data starting at
-     *  the indicated position.
-     */
-    RecordHeader(BlockIo block, short pos) {
-        this.block = block;
-        this.pos = pos;
-        if (pos > (RecordFile.BLOCK_SIZE - SIZE))
-            throw new Error("Offset too large for record header (" 
-                            + block.getBlockId() + ":" 
-                            + pos + ")");
-    }
+  // offsets
+  private static final short O_CURRENTSIZE = 0; // int currentSize
+  private static final short O_AVAILABLESIZE = Magic.SZ_INT; // int
+                                                             // availableSize
+  static final int SIZE = O_AVAILABLESIZE + Magic.SZ_INT;
 
-    /** Returns the current size */
-    int getCurrentSize() {
-        return block.readInt(pos + O_CURRENTSIZE);
-    }
-    
-    /** Sets the current size */
-    void setCurrentSize(int value) {
-        block.writeInt(pos + O_CURRENTSIZE, value);
-    }
-    
-    /** Returns the available size */
-    int getAvailableSize() {
-        return block.readInt(pos + O_AVAILABLESIZE);
-    }
-    
-    /** Sets the available size */
-    void setAvailableSize(int value) {
-        block.writeInt(pos + O_AVAILABLESIZE, value);
-    }
+  // my block and the position within the block
+  private final BlockIo block;
+  private final short pos;
 
-    // overrides java.lang.Object
-    public String toString() {
-        return "RH(" + block.getBlockId() + ":" + pos 
-            + ", avl=" + getAvailableSize()
-            + ", cur=" + getCurrentSize() 
-            + ")";
+  /**
+   * Constructs a record header from the indicated data starting at the
+   * indicated position.
+   */
+  RecordHeader(BlockIo block, short pos) {
+    this.block = block;
+    this.pos = pos;
+    if (pos > (RecordFile.BLOCK_SIZE - SIZE)) {
+      throw new Error("Offset too large for record header ("
+          + block.getBlockId() + ":" + pos + ")");
     }
+  }
+
+  /** Returns the current size */
+  int getCurrentSize() {
+    return block.readInt(pos + O_CURRENTSIZE);
+  }
+
+  /** Sets the current size */
+  void setCurrentSize(int value) {
+    block.writeInt(pos + O_CURRENTSIZE, value);
+  }
+
+  /** Returns the available size */
+  int getAvailableSize() {
+    return block.readInt(pos + O_AVAILABLESIZE);
+  }
+
+  /** Sets the available size */
+  void setAvailableSize(int value) {
+    block.writeInt(pos + O_AVAILABLESIZE, value);
+  }
+
+  // overrides java.lang.Object
+  @Override
+  public String toString() {
+    return "RH(" + block.getBlockId() + ":" + pos + ", avl="
+        + getAvailableSize() + ", cur=" + getCurrentSize() + ")";
+  }
 }