You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC
svn commit: r901644 [34/37] - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/jav...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PageManager.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PageManager.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PageManager.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PageManager.java Thu Jan 21 10:37:58 2010
@@ -65,226 +65,225 @@
package org.apache.hadoop.hive.ql.util.jdbm.recman;
-import java.io.*;
+import java.io.IOException;
/**
- * This class manages the linked lists of pages that make up a file.
+ * This class manages the linked lists of pages that make up a file.
*/
final class PageManager {
- // our record file
- private RecordFile file;
- // header data
- private FileHeader header;
- private BlockIo headerBuf;
-
- /**
- * Creates a new page manager using the indicated record file.
- */
- PageManager(RecordFile file) throws IOException {
- this.file = file;
-
- // check the file header. If the magic is 0, we assume a new
- // file. Note that we hold on to the file header node.
- headerBuf = file.get(0);
- if (headerBuf.readShort(0) == 0)
- header = new FileHeader(headerBuf, true);
- else
- header = new FileHeader(headerBuf, false);
- }
-
- /**
- * Allocates a page of the indicated type. Returns recid of the
- * page.
- */
- long allocate(short type) throws IOException {
-
- if (type == Magic.FREE_PAGE)
- throw new Error("allocate of free page?");
-
- // do we have something on the free list?
- long retval = header.getFirstOf(Magic.FREE_PAGE);
- boolean isNew = false;
- if (retval != 0) {
- // yes. Point to it and make the next of that page the
- // new first free page.
- header.setFirstOf(Magic.FREE_PAGE, getNext(retval));
- }
- else {
- // nope. make a new record
- retval = header.getLastOf(Magic.FREE_PAGE);
- if (retval == 0)
- // very new file - allocate record #1
- retval = 1;
- header.setLastOf(Magic.FREE_PAGE, retval + 1);
- isNew = true;
- }
-
- // Cool. We have a record, add it to the correct list
- BlockIo buf = file.get(retval);
- PageHeader pageHdr = isNew ? new PageHeader(buf, type)
- : PageHeader.getView(buf);
- long oldLast = header.getLastOf(type);
-
- // Clean data.
- System.arraycopy(RecordFile.cleanData, 0,
- buf.getData(), 0,
- RecordFile.BLOCK_SIZE);
- pageHdr.setType(type);
- pageHdr.setPrev(oldLast);
- pageHdr.setNext(0);
-
-
- if (oldLast == 0)
- // This was the first one of this type
- header.setFirstOf(type, retval);
- header.setLastOf(type, retval);
- file.release(retval, true);
-
- // If there's a previous, fix up its pointer
- if (oldLast != 0) {
- buf = file.get(oldLast);
- pageHdr = PageHeader.getView(buf);
- pageHdr.setNext(retval);
- file.release(oldLast, true);
- }
-
- // remove the view, we have modified the type.
- buf.setView(null);
-
- return retval;
- }
-
- /**
- * Frees a page of the indicated type.
- */
- void free(short type, long recid) throws IOException {
- if (type == Magic.FREE_PAGE)
- throw new Error("free free page?");
- if (recid == 0)
- throw new Error("free header page?");
-
- // get the page and read next and previous pointers
- BlockIo buf = file.get(recid);
- PageHeader pageHdr = PageHeader.getView(buf);
- long prev = pageHdr.getPrev();
- long next = pageHdr.getNext();
-
- // put the page at the front of the free list.
- pageHdr.setType(Magic.FREE_PAGE);
- pageHdr.setNext(header.getFirstOf(Magic.FREE_PAGE));
- pageHdr.setPrev(0);
-
- header.setFirstOf(Magic.FREE_PAGE, recid);
- file.release(recid, true);
-
- // remove the page from its old list
- if (prev != 0) {
- buf = file.get(prev);
- pageHdr = PageHeader.getView(buf);
- pageHdr.setNext(next);
- file.release(prev, true);
- }
- else {
- header.setFirstOf(type, next);
- }
- if (next != 0) {
- buf = file.get(next);
- pageHdr = PageHeader.getView(buf);
- pageHdr.setPrev(prev);
- file.release(next, true);
- }
- else {
- header.setLastOf(type, prev);
- }
-
- }
-
-
- /**
- * Returns the page following the indicated block
- */
- long getNext(long block) throws IOException {
- try {
- return PageHeader.getView(file.get(block)).getNext();
- } finally {
- file.release(block, false);
- }
- }
-
- /**
- * Returns the page before the indicated block
- */
- long getPrev(long block) throws IOException {
- try {
- return PageHeader.getView(file.get(block)).getPrev();
- } finally {
- file.release(block, false);
- }
- }
-
- /**
- * Returns the first page on the indicated list.
- */
- long getFirst(short type) throws IOException {
- return header.getFirstOf(type);
- }
-
- /**
- * Returns the last page on the indicated list.
- */
- long getLast(short type) throws IOException {
- return header.getLastOf(type);
- }
-
-
- /**
- * Commit all pending (in-memory) data by flushing the page manager.
- * This forces a flush of all outstanding blocks (this it's an implicit
- * {@link RecordFile#commit} as well).
- */
- void commit() throws IOException {
- // write the header out
- file.release(headerBuf);
- file.commit();
-
- // and obtain it again
- headerBuf = file.get(0);
- header = new FileHeader(headerBuf, false);
- }
-
- /**
- * Flushes the page manager. This forces a flush of all outstanding
- * blocks (this it's an implicit {@link RecordFile#commit} as well).
- */
- void rollback() throws IOException {
- // release header
- file.discard(headerBuf);
- file.rollback();
- // and obtain it again
- headerBuf = file.get(0);
- if (headerBuf.readShort(0) == 0)
- header = new FileHeader(headerBuf, true);
- else
- header = new FileHeader(headerBuf, false);
- }
-
- /**
- * Closes the page manager. This flushes the page manager and releases
- * the lock on the header.
- */
- void close() throws IOException {
- file.release(headerBuf);
- file.commit();
- headerBuf = null;
- header = null;
- file = null;
- }
-
- /**
- * Returns the file header.
- */
- FileHeader getFileHeader() {
- return header;
- }
-
+ // our record file
+ private RecordFile file;
+ // header data
+ private FileHeader header;
+ private BlockIo headerBuf;
+
+ /**
+ * Creates a new page manager using the indicated record file.
+ */
+ PageManager(RecordFile file) throws IOException {
+ this.file = file;
+
+ // check the file header. If the magic is 0, we assume a new
+ // file. Note that we hold on to the file header node.
+ headerBuf = file.get(0);
+ if (headerBuf.readShort(0) == 0) {
+ header = new FileHeader(headerBuf, true);
+ } else {
+ header = new FileHeader(headerBuf, false);
+ }
+ }
+
+ /**
+ * Allocates a page of the indicated type. Returns recid of the page.
+ */
+ long allocate(short type) throws IOException {
+
+ if (type == Magic.FREE_PAGE) {
+ throw new Error("allocate of free page?");
+ }
+
+ // do we have something on the free list?
+ long retval = header.getFirstOf(Magic.FREE_PAGE);
+ boolean isNew = false;
+ if (retval != 0) {
+ // yes. Point to it and make the next of that page the
+ // new first free page.
+ header.setFirstOf(Magic.FREE_PAGE, getNext(retval));
+ } else {
+ // nope. make a new record
+ retval = header.getLastOf(Magic.FREE_PAGE);
+ if (retval == 0) {
+ // very new file - allocate record #1
+ retval = 1;
+ }
+ header.setLastOf(Magic.FREE_PAGE, retval + 1);
+ isNew = true;
+ }
+
+ // Cool. We have a record, add it to the correct list
+ BlockIo buf = file.get(retval);
+ PageHeader pageHdr = isNew ? new PageHeader(buf, type) : PageHeader
+ .getView(buf);
+ long oldLast = header.getLastOf(type);
+
+ // Clean data.
+ System.arraycopy(RecordFile.cleanData, 0, buf.getData(), 0,
+ RecordFile.BLOCK_SIZE);
+ pageHdr.setType(type);
+ pageHdr.setPrev(oldLast);
+ pageHdr.setNext(0);
+
+ if (oldLast == 0) {
+ // This was the first one of this type
+ header.setFirstOf(type, retval);
+ }
+ header.setLastOf(type, retval);
+ file.release(retval, true);
+
+ // If there's a previous, fix up its pointer
+ if (oldLast != 0) {
+ buf = file.get(oldLast);
+ pageHdr = PageHeader.getView(buf);
+ pageHdr.setNext(retval);
+ file.release(oldLast, true);
+ }
+
+ // remove the view, we have modified the type.
+ buf.setView(null);
+
+ return retval;
+ }
+
+ /**
+ * Frees a page of the indicated type.
+ */
+ void free(short type, long recid) throws IOException {
+ if (type == Magic.FREE_PAGE) {
+ throw new Error("free free page?");
+ }
+ if (recid == 0) {
+ throw new Error("free header page?");
+ }
+
+ // get the page and read next and previous pointers
+ BlockIo buf = file.get(recid);
+ PageHeader pageHdr = PageHeader.getView(buf);
+ long prev = pageHdr.getPrev();
+ long next = pageHdr.getNext();
+
+ // put the page at the front of the free list.
+ pageHdr.setType(Magic.FREE_PAGE);
+ pageHdr.setNext(header.getFirstOf(Magic.FREE_PAGE));
+ pageHdr.setPrev(0);
+
+ header.setFirstOf(Magic.FREE_PAGE, recid);
+ file.release(recid, true);
+
+ // remove the page from its old list
+ if (prev != 0) {
+ buf = file.get(prev);
+ pageHdr = PageHeader.getView(buf);
+ pageHdr.setNext(next);
+ file.release(prev, true);
+ } else {
+ header.setFirstOf(type, next);
+ }
+ if (next != 0) {
+ buf = file.get(next);
+ pageHdr = PageHeader.getView(buf);
+ pageHdr.setPrev(prev);
+ file.release(next, true);
+ } else {
+ header.setLastOf(type, prev);
+ }
+
+ }
+
+ /**
+ * Returns the page following the indicated block
+ */
+ long getNext(long block) throws IOException {
+ try {
+ return PageHeader.getView(file.get(block)).getNext();
+ } finally {
+ file.release(block, false);
+ }
+ }
+
+ /**
+ * Returns the page before the indicated block
+ */
+ long getPrev(long block) throws IOException {
+ try {
+ return PageHeader.getView(file.get(block)).getPrev();
+ } finally {
+ file.release(block, false);
+ }
+ }
+
+ /**
+ * Returns the first page on the indicated list.
+ */
+ long getFirst(short type) throws IOException {
+ return header.getFirstOf(type);
+ }
+
+ /**
+ * Returns the last page on the indicated list.
+ */
+ long getLast(short type) throws IOException {
+ return header.getLastOf(type);
+ }
+
+ /**
+ * Commit all pending (in-memory) data by flushing the page manager. This
+ * forces a flush of all outstanding blocks (this it's an implicit
+ * {@link RecordFile#commit} as well).
+ */
+ void commit() throws IOException {
+ // write the header out
+ file.release(headerBuf);
+ file.commit();
+
+ // and obtain it again
+ headerBuf = file.get(0);
+ header = new FileHeader(headerBuf, false);
+ }
+
+ /**
+ * Flushes the page manager. This forces a flush of all outstanding blocks
+ * (this it's an implicit {@link RecordFile#commit} as well).
+ */
+ void rollback() throws IOException {
+ // release header
+ file.discard(headerBuf);
+ file.rollback();
+ // and obtain it again
+ headerBuf = file.get(0);
+ if (headerBuf.readShort(0) == 0) {
+ header = new FileHeader(headerBuf, true);
+ } else {
+ header = new FileHeader(headerBuf, false);
+ }
+ }
+
+ /**
+ * Closes the page manager. This flushes the page manager and releases the
+ * lock on the header.
+ */
+ void close() throws IOException {
+ file.release(headerBuf);
+ file.commit();
+ headerBuf = null;
+ header = null;
+ file = null;
+ }
+
+ /**
+ * Returns the file header.
+ */
+ FileHeader getFileHeader() {
+ return header;
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowId.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowId.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowId.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowId.java Thu Jan 21 10:37:58 2010
@@ -66,48 +66,48 @@
package org.apache.hadoop.hive.ql.util.jdbm.recman;
/**
- * A physical rowid is nothing else than a pointer to a physical location
- * in a file - a (block, offset) tuple.
- * <P>
- * <B>Note</B>: The fact that the offset is modelled as a short limits
- * the block size to 32k.
+ * A physical rowid is nothing else than a pointer to a physical location in a
+ * file - a (block, offset) tuple.
+ * <P>
+ * <B>Note</B>: The fact that the offset is modelled as a short limits the block
+ * size to 32k.
*/
class PhysicalRowId {
- // offsets
- private static final short O_BLOCK = 0; // long block
- private static final short O_OFFSET = Magic.SZ_LONG; // short offset
- static final int SIZE = O_OFFSET + Magic.SZ_SHORT;
-
- // my block and the position within the block
- BlockIo block;
- short pos;
-
- /**
- * Constructs a physical rowid from the indicated data starting at
- * the indicated position.
- */
- PhysicalRowId(BlockIo block, short pos) {
- this.block = block;
- this.pos = pos;
- }
-
- /** Returns the block number */
- long getBlock() {
- return block.readLong(pos + O_BLOCK);
- }
-
- /** Sets the block number */
- void setBlock(long value) {
- block.writeLong(pos + O_BLOCK, value);
- }
-
- /** Returns the offset */
- short getOffset() {
- return block.readShort(pos + O_OFFSET);
- }
-
- /** Sets the offset */
- void setOffset(short value) {
- block.writeShort(pos + O_OFFSET, value);
- }
+ // offsets
+ private static final short O_BLOCK = 0; // long block
+ private static final short O_OFFSET = Magic.SZ_LONG; // short offset
+ static final int SIZE = O_OFFSET + Magic.SZ_SHORT;
+
+ // my block and the position within the block
+ BlockIo block;
+ short pos;
+
+ /**
+ * Constructs a physical rowid from the indicated data starting at the
+ * indicated position.
+ */
+ PhysicalRowId(BlockIo block, short pos) {
+ this.block = block;
+ this.pos = pos;
+ }
+
+ /** Returns the block number */
+ long getBlock() {
+ return block.readLong(pos + O_BLOCK);
+ }
+
+ /** Sets the block number */
+ void setBlock(long value) {
+ block.writeLong(pos + O_BLOCK, value);
+ }
+
+ /** Returns the offset */
+ short getOffset() {
+ return block.readShort(pos + O_OFFSET);
+ }
+
+ /** Sets the offset */
+ void setOffset(short value) {
+ block.writeShort(pos + O_OFFSET, value);
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowIdManager.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowIdManager.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowIdManager.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/PhysicalRowIdManager.java Thu Jan 21 10:37:58 2010
@@ -68,289 +68,267 @@
import java.io.IOException;
/**
- * This class manages physical row ids, and their data.
+ * This class manages physical row ids, and their data.
*/
-final class PhysicalRowIdManager
-{
+final class PhysicalRowIdManager {
- // The file we're talking to and the associated page manager.
- private RecordFile file;
- private PageManager pageman;
- private FreePhysicalRowIdPageManager freeman;
-
- /**
- * Creates a new rowid manager using the indicated record file.
- * and page manager.
- */
- PhysicalRowIdManager( RecordFile file, PageManager pageManager )
- throws IOException
- {
- this.file = file;
- this.pageman = pageManager;
- this.freeman = new FreePhysicalRowIdPageManager(file, pageman);
- }
-
- /**
- * Inserts a new record. Returns the new physical rowid.
- */
- Location insert( byte[] data, int start, int length )
- throws IOException
- {
- Location retval = alloc( length );
- write( retval, data, start, length );
- return retval;
- }
-
- /**
- * Updates an existing record. Returns the possibly changed
- * physical rowid.
- */
- Location update( Location rowid, byte[] data, int start, int length )
- throws IOException
- {
- // fetch the record header
- BlockIo block = file.get( rowid.getBlock() );
- RecordHeader head = new RecordHeader( block, rowid.getOffset() );
- if ( length > head.getAvailableSize() ) {
- // not enough space - we need to copy to a new rowid.
- file.release( block );
- free( rowid );
- rowid = alloc( length );
- } else {
- file.release( block );
- }
-
- // 'nuff space, write it in and return the rowid.
- write( rowid, data, start, length );
- return rowid;
- }
-
- /**
- * Deletes a record.
- */
- void delete( Location rowid )
- throws IOException
- {
- free( rowid );
- }
-
- /**
- * Retrieves a record.
- */
- byte[] fetch( Location rowid )
- throws IOException
- {
- // fetch the record header
- PageCursor curs = new PageCursor( pageman, rowid.getBlock() );
- BlockIo block = file.get( curs.getCurrent() );
- RecordHeader head = new RecordHeader( block, rowid.getOffset() );
-
- // allocate a return buffer
- byte[] retval = new byte[ head.getCurrentSize() ];
- if ( retval.length == 0 ) {
- file.release( curs.getCurrent(), false );
- return retval;
- }
-
- // copy bytes in
- int offsetInBuffer = 0;
- int leftToRead = retval.length;
- short dataOffset = (short) (rowid.getOffset() + RecordHeader.SIZE);
- while ( leftToRead > 0 ) {
- // copy current page's data to return buffer
- int toCopy = RecordFile.BLOCK_SIZE - dataOffset;
- if ( leftToRead < toCopy ) {
- toCopy = leftToRead;
- }
- System.arraycopy( block.getData(), dataOffset,
- retval, offsetInBuffer,
- toCopy );
-
- // Go to the next block
- leftToRead -= toCopy;
- offsetInBuffer += toCopy;
-
- file.release( block );
-
- if ( leftToRead > 0 ) {
- block = file.get( curs.next() );
- dataOffset = DataPage.O_DATA;
- }
-
- }
-
- return retval;
- }
-
- /**
- * Allocate a new rowid with the indicated size.
- */
- private Location alloc( int size )
- throws IOException
- {
- Location retval = freeman.get( size );
- if ( retval == null ) {
- retval = allocNew( size, pageman.getLast( Magic.USED_PAGE ) );
- }
- return retval;
- }
-
- /**
- * Allocates a new rowid. The second parameter is there to
- * allow for a recursive call - it indicates where the search
- * should start.
- */
- private Location allocNew( int size, long start )
- throws IOException
- {
- BlockIo curBlock;
- DataPage curPage;
- if ( start == 0 ) {
- // we need to create a new page.
- start = pageman.allocate( Magic.USED_PAGE );
- curBlock = file.get( start );
- curPage = DataPage.getDataPageView( curBlock );
- curPage.setFirst( DataPage.O_DATA );
- RecordHeader hdr = new RecordHeader( curBlock, DataPage.O_DATA );
- hdr.setAvailableSize( 0 );
- hdr.setCurrentSize( 0 );
- } else {
- curBlock = file.get( start );
- curPage = DataPage.getDataPageView( curBlock );
- }
-
- // follow the rowids on this page to get to the last one. We don't
- // fall off, because this is the last page, remember?
- short pos = curPage.getFirst();
- if ( pos == 0 ) {
- // page is exactly filled by the last block of a record
- file.release( curBlock );
- return allocNew( size, 0 );
- }
-
- RecordHeader hdr = new RecordHeader( curBlock, pos );
- while ( hdr.getAvailableSize() != 0 && pos < RecordFile.BLOCK_SIZE ) {
- pos += hdr.getAvailableSize() + RecordHeader.SIZE;
- if ( pos == RecordFile.BLOCK_SIZE ) {
- // Again, a filled page.
- file.release( curBlock );
- return allocNew( size, 0 );
- }
-
- hdr = new RecordHeader( curBlock, pos );
- }
-
- if ( pos == RecordHeader.SIZE ) {
- // the last record exactly filled the page. Restart forcing
- // a new page.
- file.release( curBlock );
- }
-
- // we have the position, now tack on extra pages until we've got
- // enough space.
- Location retval = new Location( start, pos );
- int freeHere = RecordFile.BLOCK_SIZE - pos - RecordHeader.SIZE;
- if ( freeHere < size ) {
- // check whether the last page would have only a small bit left.
- // if yes, increase the allocation. A small bit is a record
- // header plus 16 bytes.
- int lastSize = (size - freeHere) % DataPage.DATA_PER_PAGE;
- if (( DataPage.DATA_PER_PAGE - lastSize ) < (RecordHeader.SIZE + 16) ) {
- size += (DataPage.DATA_PER_PAGE - lastSize);
- }
-
- // write out the header now so we don't have to come back.
- hdr.setAvailableSize( size );
- file.release( start, true );
-
- int neededLeft = size - freeHere;
- // Refactor these two blocks!
- while ( neededLeft >= DataPage.DATA_PER_PAGE ) {
- start = pageman.allocate( Magic.USED_PAGE );
- curBlock = file.get( start );
- curPage = DataPage.getDataPageView( curBlock );
- curPage.setFirst( (short) 0 ); // no rowids, just data
- file.release( start, true );
- neededLeft -= DataPage.DATA_PER_PAGE;
- }
- if ( neededLeft > 0 ) {
- // done with whole chunks, allocate last fragment.
- start = pageman.allocate( Magic.USED_PAGE );
- curBlock = file.get( start );
- curPage = DataPage.getDataPageView( curBlock );
- curPage.setFirst( (short) (DataPage.O_DATA + neededLeft) );
- file.release( start, true );
- }
- } else {
- // just update the current page. If there's less than 16 bytes
- // left, we increase the allocation (16 bytes is an arbitrary
- // number).
- if ( freeHere - size <= (16 + RecordHeader.SIZE) ) {
- size = freeHere;
- }
- hdr.setAvailableSize( size );
- file.release( start, true );
- }
- return retval;
-
- }
-
-
- private void free( Location id )
- throws IOException
- {
- // get the rowid, and write a zero current size into it.
- BlockIo curBlock = file.get( id.getBlock() );
- DataPage curPage = DataPage.getDataPageView( curBlock );
- RecordHeader hdr = new RecordHeader( curBlock, id.getOffset() );
- hdr.setCurrentSize( 0 );
- file.release( id.getBlock(), true );
-
- // write the rowid to the free list
- freeman.put( id, hdr.getAvailableSize() );
- }
-
- /**
- * Writes out data to a rowid. Assumes that any resizing has been
- * done.
- */
- private void write(Location rowid, byte[] data, int start, int length )
- throws IOException
- {
- PageCursor curs = new PageCursor( pageman, rowid.getBlock() );
- BlockIo block = file.get( curs.getCurrent() );
- RecordHeader hdr = new RecordHeader( block, rowid.getOffset() );
- hdr.setCurrentSize( length );
- if ( length == 0 ) {
- file.release( curs.getCurrent(), true );
- return;
- }
-
- // copy bytes in
- int offsetInBuffer = start;
- int leftToWrite = length;
- short dataOffset = (short) (rowid.getOffset() + RecordHeader.SIZE);
- while ( leftToWrite > 0 ) {
- // copy current page's data to return buffer
- int toCopy = RecordFile.BLOCK_SIZE - dataOffset;
-
- if ( leftToWrite < toCopy ) {
- toCopy = leftToWrite;
- }
- System.arraycopy( data, offsetInBuffer, block.getData(),
- dataOffset, toCopy );
-
- // Go to the next block
- leftToWrite -= toCopy;
- offsetInBuffer += toCopy;
-
- file.release( curs.getCurrent(), true );
-
- if ( leftToWrite > 0 ) {
- block = file.get( curs.next() );
- dataOffset = DataPage.O_DATA;
- }
- }
+ // The file we're talking to and the associated page manager.
+ private final RecordFile file;
+ private final PageManager pageman;
+ private final FreePhysicalRowIdPageManager freeman;
+
+ /**
+ * Creates a new rowid manager using the indicated record file. and page
+ * manager.
+ */
+ PhysicalRowIdManager(RecordFile file, PageManager pageManager)
+ throws IOException {
+ this.file = file;
+ pageman = pageManager;
+ freeman = new FreePhysicalRowIdPageManager(file, pageman);
+ }
+
+ /**
+ * Inserts a new record. Returns the new physical rowid.
+ */
+ Location insert(byte[] data, int start, int length) throws IOException {
+ Location retval = alloc(length);
+ write(retval, data, start, length);
+ return retval;
+ }
+
+ /**
+ * Updates an existing record. Returns the possibly changed physical rowid.
+ */
+ Location update(Location rowid, byte[] data, int start, int length)
+ throws IOException {
+ // fetch the record header
+ BlockIo block = file.get(rowid.getBlock());
+ RecordHeader head = new RecordHeader(block, rowid.getOffset());
+ if (length > head.getAvailableSize()) {
+ // not enough space - we need to copy to a new rowid.
+ file.release(block);
+ free(rowid);
+ rowid = alloc(length);
+ } else {
+ file.release(block);
+ }
+
+ // 'nuff space, write it in and return the rowid.
+ write(rowid, data, start, length);
+ return rowid;
+ }
+
+ /**
+ * Deletes a record.
+ */
+ void delete(Location rowid) throws IOException {
+ free(rowid);
+ }
+
+ /**
+ * Retrieves a record.
+ */
+ byte[] fetch(Location rowid) throws IOException {
+ // fetch the record header
+ PageCursor curs = new PageCursor(pageman, rowid.getBlock());
+ BlockIo block = file.get(curs.getCurrent());
+ RecordHeader head = new RecordHeader(block, rowid.getOffset());
+
+ // allocate a return buffer
+ byte[] retval = new byte[head.getCurrentSize()];
+ if (retval.length == 0) {
+ file.release(curs.getCurrent(), false);
+ return retval;
+ }
+
+ // copy bytes in
+ int offsetInBuffer = 0;
+ int leftToRead = retval.length;
+ short dataOffset = (short) (rowid.getOffset() + RecordHeader.SIZE);
+ while (leftToRead > 0) {
+ // copy current page's data to return buffer
+ int toCopy = RecordFile.BLOCK_SIZE - dataOffset;
+ if (leftToRead < toCopy) {
+ toCopy = leftToRead;
+ }
+ System.arraycopy(block.getData(), dataOffset, retval, offsetInBuffer,
+ toCopy);
+
+ // Go to the next block
+ leftToRead -= toCopy;
+ offsetInBuffer += toCopy;
+
+ file.release(block);
+
+ if (leftToRead > 0) {
+ block = file.get(curs.next());
+ dataOffset = DataPage.O_DATA;
+ }
+
+ }
+
+ return retval;
+ }
+
+ /**
+ * Allocate a new rowid with the indicated size.
+ */
+ private Location alloc(int size) throws IOException {
+ Location retval = freeman.get(size);
+ if (retval == null) {
+ retval = allocNew(size, pageman.getLast(Magic.USED_PAGE));
+ }
+ return retval;
+ }
+
+ /**
+ * Allocates a new rowid. The second parameter is there to allow for a
+ * recursive call - it indicates where the search should start.
+ */
+ private Location allocNew(int size, long start) throws IOException {
+ BlockIo curBlock;
+ DataPage curPage;
+ if (start == 0) {
+ // we need to create a new page.
+ start = pageman.allocate(Magic.USED_PAGE);
+ curBlock = file.get(start);
+ curPage = DataPage.getDataPageView(curBlock);
+ curPage.setFirst(DataPage.O_DATA);
+ RecordHeader hdr = new RecordHeader(curBlock, DataPage.O_DATA);
+ hdr.setAvailableSize(0);
+ hdr.setCurrentSize(0);
+ } else {
+ curBlock = file.get(start);
+ curPage = DataPage.getDataPageView(curBlock);
+ }
+
+ // follow the rowids on this page to get to the last one. We don't
+ // fall off, because this is the last page, remember?
+ short pos = curPage.getFirst();
+ if (pos == 0) {
+ // page is exactly filled by the last block of a record
+ file.release(curBlock);
+ return allocNew(size, 0);
}
-}
+ RecordHeader hdr = new RecordHeader(curBlock, pos);
+ while (hdr.getAvailableSize() != 0 && pos < RecordFile.BLOCK_SIZE) {
+ pos += hdr.getAvailableSize() + RecordHeader.SIZE;
+ if (pos == RecordFile.BLOCK_SIZE) {
+ // Again, a filled page.
+ file.release(curBlock);
+ return allocNew(size, 0);
+ }
+
+ hdr = new RecordHeader(curBlock, pos);
+ }
+
+ if (pos == RecordHeader.SIZE) {
+ // the last record exactly filled the page. Restart forcing
+ // a new page.
+ file.release(curBlock);
+ }
+
+ // we have the position, now tack on extra pages until we've got
+ // enough space.
+ Location retval = new Location(start, pos);
+ int freeHere = RecordFile.BLOCK_SIZE - pos - RecordHeader.SIZE;
+ if (freeHere < size) {
+ // check whether the last page would have only a small bit left.
+ // if yes, increase the allocation. A small bit is a record
+ // header plus 16 bytes.
+ int lastSize = (size - freeHere) % DataPage.DATA_PER_PAGE;
+ if ((DataPage.DATA_PER_PAGE - lastSize) < (RecordHeader.SIZE + 16)) {
+ size += (DataPage.DATA_PER_PAGE - lastSize);
+ }
+
+ // write out the header now so we don't have to come back.
+ hdr.setAvailableSize(size);
+ file.release(start, true);
+
+ int neededLeft = size - freeHere;
+ // Refactor these two blocks!
+ while (neededLeft >= DataPage.DATA_PER_PAGE) {
+ start = pageman.allocate(Magic.USED_PAGE);
+ curBlock = file.get(start);
+ curPage = DataPage.getDataPageView(curBlock);
+ curPage.setFirst((short) 0); // no rowids, just data
+ file.release(start, true);
+ neededLeft -= DataPage.DATA_PER_PAGE;
+ }
+ if (neededLeft > 0) {
+ // done with whole chunks, allocate last fragment.
+ start = pageman.allocate(Magic.USED_PAGE);
+ curBlock = file.get(start);
+ curPage = DataPage.getDataPageView(curBlock);
+ curPage.setFirst((short) (DataPage.O_DATA + neededLeft));
+ file.release(start, true);
+ }
+ } else {
+ // just update the current page. If there's less than 16 bytes
+ // left, we increase the allocation (16 bytes is an arbitrary
+ // number).
+ if (freeHere - size <= (16 + RecordHeader.SIZE)) {
+ size = freeHere;
+ }
+ hdr.setAvailableSize(size);
+ file.release(start, true);
+ }
+ return retval;
+
+ }
+
+ private void free(Location id) throws IOException {
+ // get the rowid, and write a zero current size into it.
+ BlockIo curBlock = file.get(id.getBlock());
+ DataPage.getDataPageView(curBlock);
+ RecordHeader hdr = new RecordHeader(curBlock, id.getOffset());
+ hdr.setCurrentSize(0);
+ file.release(id.getBlock(), true);
+
+ // write the rowid to the free list
+ freeman.put(id, hdr.getAvailableSize());
+ }
+
+ /**
+ * Writes out data to a rowid. Assumes that any resizing has been done.
+ */
+ private void write(Location rowid, byte[] data, int start, int length)
+ throws IOException {
+ PageCursor curs = new PageCursor(pageman, rowid.getBlock());
+ BlockIo block = file.get(curs.getCurrent());
+ RecordHeader hdr = new RecordHeader(block, rowid.getOffset());
+ hdr.setCurrentSize(length);
+ if (length == 0) {
+ file.release(curs.getCurrent(), true);
+ return;
+ }
+
+ // copy bytes in
+ int offsetInBuffer = start;
+ int leftToWrite = length;
+ short dataOffset = (short) (rowid.getOffset() + RecordHeader.SIZE);
+ while (leftToWrite > 0) {
+ // copy current page's data to return buffer
+ int toCopy = RecordFile.BLOCK_SIZE - dataOffset;
+
+ if (leftToWrite < toCopy) {
+ toCopy = leftToWrite;
+ }
+ System.arraycopy(data, offsetInBuffer, block.getData(), dataOffset,
+ toCopy);
+
+ // Go to the next block
+ leftToWrite -= toCopy;
+ offsetInBuffer += toCopy;
+
+ file.release(curs.getCurrent(), true);
+
+ if (leftToWrite > 0) {
+ block = file.get(curs.next());
+ dataOffset = DataPage.O_DATA;
+ }
+ }
+ }
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java Thu Jan 21 10:37:58 2010
@@ -66,86 +66,84 @@
package org.apache.hadoop.hive.ql.util.jdbm.recman;
-import java.io.IOException;
import java.io.File;
+import java.io.IOException;
import java.util.Properties;
import org.apache.hadoop.hive.ql.util.jdbm.RecordManager;
import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerOptions;
import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerProvider;
-
import org.apache.hadoop.hive.ql.util.jdbm.helper.MRU;
/**
* Provider of the default RecordManager implementation.
- *
+ *
* @author <a href="mailto:boisvert@intalio.com">Alex Boisvert</a>
* @version $Id: Provider.java,v 1.3 2005/06/25 23:12:32 doomdark Exp $
*/
-public final class Provider
- implements RecordManagerProvider
-{
-
- /**
- * Create a default implementation record manager.
- *
- * @param name Name of the record file.
- * @param options Record manager options.
- * @throws IOException if an I/O related exception occurs while creating
- * or opening the record manager.
- * @throws UnsupportedOperationException if some options are not supported by the
- * implementation.
- * @throws IllegalArgumentException if some options are invalid.
- */
- public RecordManager createRecordManager( String name,
- Properties options )
- throws IOException
- {
- RecordManager recman;
-
- recman = new BaseRecordManager( name );
- recman = getCachedRecordManager(recman, options);
- return recman;
- }
-
- private RecordManager getCachedRecordManager(RecordManager recman, Properties options)
- {
- String value;
- int cacheSize;
-
- value = options.getProperty( RecordManagerOptions.DISABLE_TRANSACTIONS, "false" );
- if ( value.equalsIgnoreCase( "TRUE" ) ) {
- ( (BaseRecordManager) recman ).disableTransactions();
- }
-
- value = options.getProperty( RecordManagerOptions.CACHE_SIZE, "1000" );
- cacheSize = Integer.parseInt( value );
-
- value = options.getProperty( RecordManagerOptions.CACHE_TYPE,
- RecordManagerOptions.NORMAL_CACHE );
- if ( value.equalsIgnoreCase( RecordManagerOptions.NORMAL_CACHE ) ) {
- MRU cache = new MRU( cacheSize );
- recman = new CacheRecordManager( recman, cache );
- } else if ( value.equalsIgnoreCase( RecordManagerOptions.SOFT_REF_CACHE ) ) {
- throw new IllegalArgumentException( "Soft reference cache not implemented" );
- } else if ( value.equalsIgnoreCase( RecordManagerOptions.WEAK_REF_CACHE ) ) {
- throw new IllegalArgumentException( "Weak reference cache not implemented" );
- } else if ( value.equalsIgnoreCase(RecordManagerOptions.NO_CACHE) ){
- // do nothing
- } else {
- throw new IllegalArgumentException( "Invalid cache type: " + value );
- }
+public final class Provider implements RecordManagerProvider {
- return recman;
+ /**
+ * Create a default implementation record manager.
+ *
+ * @param name
+ * Name of the record file.
+ * @param options
+ * Record manager options.
+ * @throws IOException
+ * if an I/O related exception occurs while creating or opening the
+ * record manager.
+ * @throws UnsupportedOperationException
+ * if some options are not supported by the implementation.
+ * @throws IllegalArgumentException
+ * if some options are invalid.
+ */
+ public RecordManager createRecordManager(String name, Properties options)
+ throws IOException {
+ RecordManager recman;
+
+ recman = new BaseRecordManager(name);
+ recman = getCachedRecordManager(recman, options);
+ return recman;
+ }
+
+ private RecordManager getCachedRecordManager(RecordManager recman,
+ Properties options) {
+ String value;
+ int cacheSize;
+
+ value = options.getProperty(RecordManagerOptions.DISABLE_TRANSACTIONS,
+ "false");
+ if (value.equalsIgnoreCase("TRUE")) {
+ ((BaseRecordManager) recman).disableTransactions();
}
- public RecordManager createRecordManager ( File file,
- Properties options )
- throws IOException
- {
- RecordManager recman = new BaseRecordManager(file);
- recman = getCachedRecordManager(recman, options);
- return recman;
+ value = options.getProperty(RecordManagerOptions.CACHE_SIZE, "1000");
+ cacheSize = Integer.parseInt(value);
+
+ value = options.getProperty(RecordManagerOptions.CACHE_TYPE,
+ RecordManagerOptions.NORMAL_CACHE);
+ if (value.equalsIgnoreCase(RecordManagerOptions.NORMAL_CACHE)) {
+ MRU cache = new MRU(cacheSize);
+ recman = new CacheRecordManager(recman, cache);
+ } else if (value.equalsIgnoreCase(RecordManagerOptions.SOFT_REF_CACHE)) {
+ throw new IllegalArgumentException("Soft reference cache not implemented");
+ } else if (value.equalsIgnoreCase(RecordManagerOptions.WEAK_REF_CACHE)) {
+ throw new IllegalArgumentException("Weak reference cache not implemented");
+ } else if (value.equalsIgnoreCase(RecordManagerOptions.NO_CACHE)) {
+ // do nothing
+ } else {
+ throw new IllegalArgumentException("Invalid cache type: " + value);
}
+ return recman;
+ }
+
+ public RecordManager createRecordManager(File file, Properties options)
+ throws IOException {
+ RecordManager recman = new BaseRecordManager(file);
+ recman = getCachedRecordManager(recman, options);
+ return recman;
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordCache.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordCache.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordCache.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordCache.java Thu Jan 21 10:37:58 2010
@@ -68,31 +68,31 @@
import java.io.IOException;
/**
- * This interface is used for synchronization.
- * <p>
- * RecordManager ensures that the cache has the up-to-date information
- * by way of an invalidation protocol.
+ * This interface is used for synchronization.
+ * <p>
+ * RecordManager ensures that the cache has the up-to-date information by way of
+ * an invalidation protocol.
*/
public interface RecordCache {
- /**
- * Notification to flush content related to a given record.
- */
- public void flush(long recid) throws IOException;
-
- /**
- * Notification to flush data all of records.
- */
- public void flushAll() throws IOException;
-
- /**
- * Notification to invalidate content related to given record.
- */
- public void invalidate(long recid) throws IOException;
-
- /**
- * Notification to invalidate content of all records.
- */
- public void invalidateAll() throws IOException;
+ /**
+ * Notification to flush content related to a given record.
+ */
+ public void flush(long recid) throws IOException;
+
+ /**
+ * Notification to flush data all of records.
+ */
+ public void flushAll() throws IOException;
+
+ /**
+ * Notification to invalidate content related to given record.
+ */
+ public void invalidate(long recid) throws IOException;
+
+ /**
+ * Notification to invalidate content of all records.
+ */
+ public void invalidateAll() throws IOException;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java Thu Jan 21 10:37:58 2010
@@ -65,384 +65,389 @@
package org.apache.hadoop.hive.ql.util.jdbm.recman;
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
/**
- * This class represents a random access file as a set of fixed size
- * records. Each record has a physical record number, and records are
- * cached in order to improve access.
+ * This class represents a random access file as a set of fixed size records.
+ * Each record has a physical record number, and records are cached in order to
+ * improve access.
*<p>
- * The set of dirty records on the in-use list constitutes a transaction.
- * Later on, we will send these records to some recovery thingy.
+ * The set of dirty records on the in-use list constitutes a transaction. Later
+ * on, we will send these records to some recovery thingy.
*/
public final class RecordFile {
- final TransactionManager txnMgr;
+ final TransactionManager txnMgr;
- // Todo: reorganize in hashes and fifos as necessary.
- // free -> inUse -> dirty -> inTxn -> free
- // free is a cache, thus a FIFO. The rest are hashes.
- private final LinkedList free = new LinkedList();
- private final HashMap inUse = new HashMap();
- private final HashMap dirty = new HashMap();
- private final HashMap inTxn = new HashMap();
-
- // transactions disabled?
- private boolean transactionsDisabled = false;
-
- /** The length of a single block. */
- public final static int BLOCK_SIZE = 8192;//4096;
-
- /** The extension of a record file */
- final static String extension = ".db";
-
- /** A block of clean data to wipe clean pages. */
- final static byte[] cleanData = new byte[BLOCK_SIZE];
-
- private RandomAccessFile file;
- private final String fileName;
-
- /**
- * Creates a new object on the indicated filename. The file is
- * opened in read/write mode.
- *
- * @param fileName the name of the file to open or create, without
- * an extension.
- * @throws IOException whenever the creation of the underlying
- * RandomAccessFile throws it.
- */
- RecordFile(String fileName) throws IOException {
- this.fileName = fileName;
- file = new RandomAccessFile(fileName + extension, "rw");
- txnMgr = new TransactionManager(this);
- }
-
- /**
- * Creates a new object on the indicated filename. The file is
- * opened in read/write mode.
- *
- * @param fileName the name of the file to open or create, without
- * an extension.
- * @throws IOException whenever the creation of the underlying
- * RandomAccessFile throws it.
- */
- RecordFile(File file) throws IOException {
- this.fileName = file.getName();
- this.file = new RandomAccessFile(file, "rw");
- txnMgr = new TransactionManager(this);
- }
-
- /**
- * Returns the file name.
- */
- String getFileName() {
- return fileName;
- }
-
- /**
- * Disables transactions: doesn't sync and doesn't use the
- * transaction manager.
- */
- void disableTransactions() {
- transactionsDisabled = true;
- }
-
- /**
- * Gets a block from the file. The returned byte array is
- * the in-memory copy of the record, and thus can be written
- * (and subsequently released with a dirty flag in order to
- * write the block back).
- *
- * @param blockid The record number to retrieve.
- */
- BlockIo get(long blockid) throws IOException {
- Long key = new Long(blockid);
-
- // try in transaction list, dirty list, free list
- BlockIo node = (BlockIo) inTxn.get(key);
- if (node != null) {
- inTxn.remove(key);
- inUse.put(key, node);
- return node;
- }
- node = (BlockIo) dirty.get(key);
- if (node != null) {
- dirty.remove(key);
- inUse.put(key, node);
- return node;
- }
- for (Iterator i = free.iterator(); i.hasNext(); ) {
- BlockIo cur = (BlockIo) i.next();
- if (cur.getBlockId() == blockid) {
- node = cur;
- i.remove();
- inUse.put(key, node);
- return node;
- }
- }
-
- // sanity check: can't be on in use list
- if (inUse.get(key) != null) {
- throw new Error("double get for block " + blockid);
- }
-
- // get a new node and read it from the file
- node = getNewNode(blockid);
- long offset = blockid * BLOCK_SIZE;
- if (file.length() > 0 && offset <= file.length()) {
- read(file, offset, node.getData(), BLOCK_SIZE);
- } else {
- System.arraycopy(cleanData, 0, node.getData(), 0, BLOCK_SIZE);
- }
- inUse.put(key, node);
- node.setClean();
- return node;
- }
-
-
- /**
- * Releases a block.
- *
- * @param blockid The record number to release.
- * @param isDirty If true, the block was modified since the get().
- */
- void release(long blockid, boolean isDirty)
- throws IOException {
- BlockIo node = (BlockIo) inUse.get(new Long(blockid));
- if (node == null)
- throw new IOException("bad blockid " + blockid + " on release");
- if (!node.isDirty() && isDirty)
- node.setDirty();
- release(node);
- }
-
- /**
- * Releases a block.
- *
- * @param block The block to release.
- */
- void release(BlockIo block) {
- Long key = new Long(block.getBlockId());
- inUse.remove(key);
- if (block.isDirty()) {
- // System.out.println( "Dirty: " + key + block );
- dirty.put(key, block);
- } else {
- if (!transactionsDisabled && block.isInTransaction()) {
- inTxn.put(key, block);
- } else {
- free.add(block);
- }
- }
- }
-
- /**
- * Discards a block (will not write the block even if it's dirty)
- *
- * @param block The block to discard.
- */
- void discard(BlockIo block) {
- Long key = new Long(block.getBlockId());
- inUse.remove(key);
-
- // note: block not added to free list on purpose, because
- // it's considered invalid
- }
-
- /**
- * Commits the current transaction by flushing all dirty buffers
- * to disk.
- */
- void commit() throws IOException {
- // debugging...
- if (!inUse.isEmpty() && inUse.size() > 1) {
- showList(inUse.values().iterator());
- throw new Error("in use list not empty at commit time ("
- + inUse.size() + ")");
- }
-
- // System.out.println("committing...");
-
- if ( dirty.size() == 0 ) {
- // if no dirty blocks, skip commit process
- return;
- }
-
- if (!transactionsDisabled) {
- txnMgr.start();
- }
-
- for (Iterator i = dirty.values().iterator(); i.hasNext(); ) {
- BlockIo node = (BlockIo) i.next();
- i.remove();
- // System.out.println("node " + node + " map size now " + dirty.size());
- if (transactionsDisabled) {
- long offset = node.getBlockId() * BLOCK_SIZE;
- file.seek(offset);
- file.write(node.getData());
- node.setClean();
- free.add(node);
- }
- else {
- txnMgr.add(node);
- inTxn.put(new Long(node.getBlockId()), node);
- }
- }
- if (!transactionsDisabled) {
- txnMgr.commit();
- }
- }
-
- /**
- * Rollback the current transaction by discarding all dirty buffers
- */
- void rollback() throws IOException {
- // debugging...
- if (!inUse.isEmpty()) {
- showList(inUse.values().iterator());
- throw new Error("in use list not empty at rollback time ("
- + inUse.size() + ")");
- }
- // System.out.println("rollback...");
- dirty.clear();
-
- txnMgr.synchronizeLogFromDisk();
-
- if (!inTxn.isEmpty()) {
- showList(inTxn.values().iterator());
- throw new Error("in txn list not empty at rollback time ("
- + inTxn.size() + ")");
- };
- }
-
- /**
- * Commits and closes file.
- */
- void close() throws IOException {
- if (!dirty.isEmpty()) {
- commit();
- }
- txnMgr.shutdown();
- if ( transactionsDisabled ) {
- txnMgr.removeLogFile();
- }
-
- if (!inTxn.isEmpty()) {
- showList(inTxn.values().iterator());
- throw new Error("In transaction not empty");
- }
-
- // these actually ain't that bad in a production release
- if (!dirty.isEmpty()) {
- System.out.println("ERROR: dirty blocks at close time");
- showList(dirty.values().iterator());
- throw new Error("Dirty blocks at close time");
- }
- if (!inUse.isEmpty()) {
- System.out.println("ERROR: inUse blocks at close time");
- showList(inUse.values().iterator());
- throw new Error("inUse blocks at close time");
- }
-
- // debugging stuff to keep an eye on the free list
- // System.out.println("Free list size:" + free.size());
- file.close();
- file = null;
- }
-
-
- /**
- * Force closing the file and underlying transaction manager.
- * Used for testing purposed only.
- */
- void forceClose() throws IOException {
- txnMgr.forceClose();
- file.close();
- }
-
- /**
- * Prints contents of a list
- */
- private void showList(Iterator i) {
- int cnt = 0;
- while (i.hasNext()) {
- System.out.println("elem " + cnt + ": " + i.next());
- cnt++;
- }
- }
-
-
- /**
- * Returns a new node. The node is retrieved (and removed)
- * from the released list or created new.
- */
- private BlockIo getNewNode(long blockid)
- throws IOException {
-
- BlockIo retval = null;
- if (!free.isEmpty()) {
- retval = (BlockIo) free.removeFirst();
- }
- if (retval == null)
- retval = new BlockIo(0, new byte[BLOCK_SIZE]);
-
- retval.setBlockId(blockid);
- retval.setView(null);
- return retval;
- }
-
- /**
- * Synchs a node to disk. This is called by the transaction manager's
- * synchronization code.
- */
- void synch(BlockIo node) throws IOException {
- byte[] data = node.getData();
- if (data != null) {
- long offset = node.getBlockId() * BLOCK_SIZE;
- file.seek(offset);
- file.write(data);
- }
- }
-
- /**
- * Releases a node from the transaction list, if it was sitting
- * there.
- *
- * @param recycle true if block data can be reused
- */
- void releaseFromTransaction(BlockIo node, boolean recycle)
- throws IOException {
- Long key = new Long(node.getBlockId());
- if ((inTxn.remove(key) != null) && recycle) {
- free.add(node);
- }
- }
-
- /**
- * Synchronizes the file.
- */
- void sync() throws IOException {
- file.getFD().sync();
- }
-
-
- /**
- * Utility method: Read a block from a RandomAccessFile
- */
- private static void read(RandomAccessFile file, long offset,
- byte[] buffer, int nBytes) throws IOException {
+ // Todo: reorganize in hashes and fifos as necessary.
+ // free -> inUse -> dirty -> inTxn -> free
+ // free is a cache, thus a FIFO. The rest are hashes.
+ private final LinkedList free = new LinkedList();
+ private final HashMap inUse = new HashMap();
+ private final HashMap dirty = new HashMap();
+ private final HashMap inTxn = new HashMap();
+
+ // transactions disabled?
+ private boolean transactionsDisabled = false;
+
+ /** The length of a single block. */
+ public final static int BLOCK_SIZE = 8192;// 4096;
+
+ /** The extension of a record file */
+ final static String extension = ".db";
+
+ /** A block of clean data to wipe clean pages. */
+ final static byte[] cleanData = new byte[BLOCK_SIZE];
+
+ private RandomAccessFile file;
+ private final String fileName;
+
+ /**
+ * Creates a new object on the indicated filename. The file is opened in
+ * read/write mode.
+ *
+ * @param fileName
+ * the name of the file to open or create, without an extension.
+ * @throws IOException
+ * whenever the creation of the underlying RandomAccessFile throws
+ * it.
+ */
+ RecordFile(String fileName) throws IOException {
+ this.fileName = fileName;
+ file = new RandomAccessFile(fileName + extension, "rw");
+ txnMgr = new TransactionManager(this);
+ }
+
+ /**
+ * Creates a new object on the indicated filename. The file is opened in
+ * read/write mode.
+ *
+ * @param fileName
+ * the name of the file to open or create, without an extension.
+ * @throws IOException
+ * whenever the creation of the underlying RandomAccessFile throws
+ * it.
+ */
+ RecordFile(File file) throws IOException {
+ fileName = file.getName();
+ this.file = new RandomAccessFile(file, "rw");
+ txnMgr = new TransactionManager(this);
+ }
+
+ /**
+ * Returns the file name.
+ */
+ String getFileName() {
+ return fileName;
+ }
+
+ /**
+ * Disables transactions: doesn't sync and doesn't use the transaction
+ * manager.
+ */
+ void disableTransactions() {
+ transactionsDisabled = true;
+ }
+
+ /**
+ * Gets a block from the file. The returned byte array is the in-memory copy
+ * of the record, and thus can be written (and subsequently released with a
+ * dirty flag in order to write the block back).
+ *
+ * @param blockid
+ * The record number to retrieve.
+ */
+ BlockIo get(long blockid) throws IOException {
+ Long key = new Long(blockid);
+
+ // try in transaction list, dirty list, free list
+ BlockIo node = (BlockIo) inTxn.get(key);
+ if (node != null) {
+ inTxn.remove(key);
+ inUse.put(key, node);
+ return node;
+ }
+ node = (BlockIo) dirty.get(key);
+ if (node != null) {
+ dirty.remove(key);
+ inUse.put(key, node);
+ return node;
+ }
+ for (Iterator i = free.iterator(); i.hasNext();) {
+ BlockIo cur = (BlockIo) i.next();
+ if (cur.getBlockId() == blockid) {
+ node = cur;
+ i.remove();
+ inUse.put(key, node);
+ return node;
+ }
+ }
+
+ // sanity check: can't be on in use list
+ if (inUse.get(key) != null) {
+ throw new Error("double get for block " + blockid);
+ }
+
+ // get a new node and read it from the file
+ node = getNewNode(blockid);
+ long offset = blockid * BLOCK_SIZE;
+ if (file.length() > 0 && offset <= file.length()) {
+ read(file, offset, node.getData(), BLOCK_SIZE);
+ } else {
+ System.arraycopy(cleanData, 0, node.getData(), 0, BLOCK_SIZE);
+ }
+ inUse.put(key, node);
+ node.setClean();
+ return node;
+ }
+
+ /**
+ * Releases a block.
+ *
+ * @param blockid
+ * The record number to release.
+ * @param isDirty
+ * If true, the block was modified since the get().
+ */
+ void release(long blockid, boolean isDirty) throws IOException {
+ BlockIo node = (BlockIo) inUse.get(new Long(blockid));
+ if (node == null) {
+ throw new IOException("bad blockid " + blockid + " on release");
+ }
+ if (!node.isDirty() && isDirty) {
+ node.setDirty();
+ }
+ release(node);
+ }
+
+ /**
+ * Releases a block.
+ *
+ * @param block
+ * The block to release.
+ */
+ void release(BlockIo block) {
+ Long key = new Long(block.getBlockId());
+ inUse.remove(key);
+ if (block.isDirty()) {
+ // System.out.println( "Dirty: " + key + block );
+ dirty.put(key, block);
+ } else {
+ if (!transactionsDisabled && block.isInTransaction()) {
+ inTxn.put(key, block);
+ } else {
+ free.add(block);
+ }
+ }
+ }
+
+ /**
+ * Discards a block (will not write the block even if it's dirty)
+ *
+ * @param block
+ * The block to discard.
+ */
+ void discard(BlockIo block) {
+ Long key = new Long(block.getBlockId());
+ inUse.remove(key);
+
+ // note: block not added to free list on purpose, because
+ // it's considered invalid
+ }
+
+ /**
+ * Commits the current transaction by flushing all dirty buffers to disk.
+ */
+ void commit() throws IOException {
+ // debugging...
+ if (!inUse.isEmpty() && inUse.size() > 1) {
+ showList(inUse.values().iterator());
+ throw new Error("in use list not empty at commit time (" + inUse.size()
+ + ")");
+ }
+
+ // System.out.println("committing...");
+
+ if (dirty.size() == 0) {
+ // if no dirty blocks, skip commit process
+ return;
+ }
+
+ if (!transactionsDisabled) {
+ txnMgr.start();
+ }
+
+ for (Iterator i = dirty.values().iterator(); i.hasNext();) {
+ BlockIo node = (BlockIo) i.next();
+ i.remove();
+ // System.out.println("node " + node + " map size now " + dirty.size());
+ if (transactionsDisabled) {
+ long offset = node.getBlockId() * BLOCK_SIZE;
file.seek(offset);
- int remaining = nBytes;
- int pos = 0;
- while (remaining > 0) {
- int read = file.read(buffer, pos, remaining);
- if (read == -1) {
- System.arraycopy(cleanData, 0, buffer, pos, remaining);
- break;
- }
- remaining -= read;
- pos += read;
- }
+ file.write(node.getData());
+ node.setClean();
+ free.add(node);
+ } else {
+ txnMgr.add(node);
+ inTxn.put(new Long(node.getBlockId()), node);
+ }
+ }
+ if (!transactionsDisabled) {
+ txnMgr.commit();
+ }
+ }
+
+ /**
+ * Rollback the current transaction by discarding all dirty buffers
+ */
+ void rollback() throws IOException {
+ // debugging...
+ if (!inUse.isEmpty()) {
+ showList(inUse.values().iterator());
+ throw new Error("in use list not empty at rollback time (" + inUse.size()
+ + ")");
+ }
+ // System.out.println("rollback...");
+ dirty.clear();
+
+ txnMgr.synchronizeLogFromDisk();
+
+ if (!inTxn.isEmpty()) {
+ showList(inTxn.values().iterator());
+ throw new Error("in txn list not empty at rollback time (" + inTxn.size()
+ + ")");
+ }
+ ;
+ }
+
+ /**
+ * Commits and closes file.
+ */
+ void close() throws IOException {
+ if (!dirty.isEmpty()) {
+ commit();
+ }
+ txnMgr.shutdown();
+ if (transactionsDisabled) {
+ txnMgr.removeLogFile();
+ }
+
+ if (!inTxn.isEmpty()) {
+ showList(inTxn.values().iterator());
+ throw new Error("In transaction not empty");
+ }
+
+ // these actually ain't that bad in a production release
+ if (!dirty.isEmpty()) {
+ System.out.println("ERROR: dirty blocks at close time");
+ showList(dirty.values().iterator());
+ throw new Error("Dirty blocks at close time");
+ }
+ if (!inUse.isEmpty()) {
+ System.out.println("ERROR: inUse blocks at close time");
+ showList(inUse.values().iterator());
+ throw new Error("inUse blocks at close time");
+ }
+
+ // debugging stuff to keep an eye on the free list
+ // System.out.println("Free list size:" + free.size());
+ file.close();
+ file = null;
+ }
+
+ /**
+ * Force closing the file and underlying transaction manager. Used for testing
+ * purposed only.
+ */
+ void forceClose() throws IOException {
+ txnMgr.forceClose();
+ file.close();
+ }
+
+ /**
+ * Prints contents of a list
+ */
+ private void showList(Iterator i) {
+ int cnt = 0;
+ while (i.hasNext()) {
+ System.out.println("elem " + cnt + ": " + i.next());
+ cnt++;
+ }
+ }
+
+ /**
+ * Returns a new node. The node is retrieved (and removed) from the released
+ * list or created new.
+ */
+ private BlockIo getNewNode(long blockid) throws IOException {
+
+ BlockIo retval = null;
+ if (!free.isEmpty()) {
+ retval = (BlockIo) free.removeFirst();
+ }
+ if (retval == null) {
+ retval = new BlockIo(0, new byte[BLOCK_SIZE]);
+ }
+
+ retval.setBlockId(blockid);
+ retval.setView(null);
+ return retval;
+ }
+
+ /**
+ * Synchs a node to disk. This is called by the transaction manager's
+ * synchronization code.
+ */
+ void synch(BlockIo node) throws IOException {
+ byte[] data = node.getData();
+ if (data != null) {
+ long offset = node.getBlockId() * BLOCK_SIZE;
+ file.seek(offset);
+ file.write(data);
+ }
+ }
+
+ /**
+ * Releases a node from the transaction list, if it was sitting there.
+ *
+ * @param recycle
+ * true if block data can be reused
+ */
+ void releaseFromTransaction(BlockIo node, boolean recycle) throws IOException {
+ Long key = new Long(node.getBlockId());
+ if ((inTxn.remove(key) != null) && recycle) {
+ free.add(node);
+ }
+ }
+
+ /**
+ * Synchronizes the file.
+ */
+ void sync() throws IOException {
+ file.getFD().sync();
+ }
+
+ /**
+ * Utility method: Read a block from a RandomAccessFile
+ */
+ private static void read(RandomAccessFile file, long offset, byte[] buffer,
+ int nBytes) throws IOException {
+ file.seek(offset);
+ int remaining = nBytes;
+ int pos = 0;
+ while (remaining > 0) {
+ int read = file.read(buffer, pos, remaining);
+ if (read == -1) {
+ System.arraycopy(cleanData, 0, buffer, pos, remaining);
+ break;
+ }
+ remaining -= read;
+ pos += read;
}
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordHeader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordHeader.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordHeader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordHeader.java Thu Jan 21 10:37:58 2010
@@ -66,60 +66,59 @@
package org.apache.hadoop.hive.ql.util.jdbm.recman;
/**
- * The data that comes at the start of a record of data. It stores
- * both the current size and the avaliable size for the record - the latter
- * can be bigger than the former, which allows the record to grow without
- * needing to be moved and which allows the system to put small records
- * in larger free spots.
+ * The data that comes at the start of a record of data. It stores both the
+ * current size and the avaliable size for the record - the latter can be bigger
+ * than the former, which allows the record to grow without needing to be moved
+ * and which allows the system to put small records in larger free spots.
*/
class RecordHeader {
- // offsets
- private static final short O_CURRENTSIZE = 0; // int currentSize
- private static final short O_AVAILABLESIZE = Magic.SZ_INT; // int availableSize
- static final int SIZE = O_AVAILABLESIZE + Magic.SZ_INT;
-
- // my block and the position within the block
- private BlockIo block;
- private short pos;
-
- /**
- * Constructs a record header from the indicated data starting at
- * the indicated position.
- */
- RecordHeader(BlockIo block, short pos) {
- this.block = block;
- this.pos = pos;
- if (pos > (RecordFile.BLOCK_SIZE - SIZE))
- throw new Error("Offset too large for record header ("
- + block.getBlockId() + ":"
- + pos + ")");
- }
+ // offsets
+ private static final short O_CURRENTSIZE = 0; // int currentSize
+ private static final short O_AVAILABLESIZE = Magic.SZ_INT; // int
+ // availableSize
+ static final int SIZE = O_AVAILABLESIZE + Magic.SZ_INT;
- /** Returns the current size */
- int getCurrentSize() {
- return block.readInt(pos + O_CURRENTSIZE);
- }
-
- /** Sets the current size */
- void setCurrentSize(int value) {
- block.writeInt(pos + O_CURRENTSIZE, value);
- }
-
- /** Returns the available size */
- int getAvailableSize() {
- return block.readInt(pos + O_AVAILABLESIZE);
- }
-
- /** Sets the available size */
- void setAvailableSize(int value) {
- block.writeInt(pos + O_AVAILABLESIZE, value);
- }
+ // my block and the position within the block
+ private final BlockIo block;
+ private final short pos;
- // overrides java.lang.Object
- public String toString() {
- return "RH(" + block.getBlockId() + ":" + pos
- + ", avl=" + getAvailableSize()
- + ", cur=" + getCurrentSize()
- + ")";
+ /**
+ * Constructs a record header from the indicated data starting at the
+ * indicated position.
+ */
+ RecordHeader(BlockIo block, short pos) {
+ this.block = block;
+ this.pos = pos;
+ if (pos > (RecordFile.BLOCK_SIZE - SIZE)) {
+ throw new Error("Offset too large for record header ("
+ + block.getBlockId() + ":" + pos + ")");
}
+ }
+
+ /** Returns the current size */
+ int getCurrentSize() {
+ return block.readInt(pos + O_CURRENTSIZE);
+ }
+
+ /** Sets the current size */
+ void setCurrentSize(int value) {
+ block.writeInt(pos + O_CURRENTSIZE, value);
+ }
+
+ /** Returns the available size */
+ int getAvailableSize() {
+ return block.readInt(pos + O_AVAILABLESIZE);
+ }
+
+ /** Sets the available size */
+ void setAvailableSize(int value) {
+ block.writeInt(pos + O_AVAILABLESIZE, value);
+ }
+
+ // overrides java.lang.Object
+ @Override
+ public String toString() {
+ return "RH(" + block.getBlockId() + ":" + pos + ", avl="
+ + getAvailableSize() + ", cur=" + getCurrentSize() + ")";
+ }
}