You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/03/10 23:36:15 UTC
svn commit: r752288 - in /hadoop/hbase/branches/0.19: ./
src/java/org/apache/hadoop/hbase/regionserver/
src/test/org/apache/hadoop/hbase/regionserver/
Author: stack
Date: Tue Mar 10 22:36:14 2009
New Revision: 752288
URL: http://svn.apache.org/viewvc?rev=752288&view=rev
Log:
HBASE-1219 and HBASE-1220 Don't reopen file if already open when updating readers underneath scanners
Modified:
hadoop/hbase/branches/0.19/CHANGES.txt
hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java
hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
hadoop/hbase/branches/0.19/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java
Modified: hadoop/hbase/branches/0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/CHANGES.txt?rev=752288&r1=752287&r2=752288&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.19/CHANGES.txt Tue Mar 10 22:36:14 2009
@@ -22,7 +22,11 @@
to compact when loaded with hundreds of regions
HBASE-1247 checkAndSave doesn't Write Ahead Log
HBASE-1243 oldlogfile.dat is screwed, so is it's region
- HBASE-1169 When a shutdown is requested, stop scanning META regions immediately
+ HBASE-1169 When a shutdown is requested, stop scanning META regions
+ immediately
+ HBASE-1219 Scanners can miss values riding the flush transition
+ HBASE-1220 Don't reopen file if already open when updating readers
+ underneath scanners
IMPROVEMENTS
Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java?rev=752288&r1=752287&r2=752288&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java Tue Mar 10 22:36:14 2009
@@ -29,7 +29,9 @@
public interface ChangedReadersObserver {
/**
* Notify observers.
+ * @param flushid Flush sequence id for new file. Acts as identifier for new
+ * file.
* @throws IOException
*/
- void updateReaders() throws IOException;
+ void updateReaders(final long flushid) throws IOException;
}
\ No newline at end of file
Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=752288&r1=752287&r2=752288&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Mar 10 22:36:14 2009
@@ -728,7 +728,7 @@
this.family.isBlockCacheEnabled()));
this.storefiles.put(flushid, flushedFile);
// Tell listeners of the change in readers.
- notifyChangedReadersObservers();
+ notifyChangedReadersObservers(flushid);
} finally {
this.lock.writeLock().unlock();
}
@@ -736,11 +736,14 @@
/*
* Notify all observers that set of Readers has changed.
+ * @param flushid The flush id for the file just added. Acts as new-file
+ * identifier.
* @throws IOException
*/
- private void notifyChangedReadersObservers() throws IOException {
+ private void notifyChangedReadersObservers(final long flushid)
+ throws IOException {
for (ChangedReadersObserver o: this.changedReaderObservers) {
- o.updateReaders();
+ o.updateReaders(flushid);
}
}
@@ -1290,7 +1293,7 @@
this.family.isBlockCacheEnabled()));
this.storefiles.put(orderVal, finalCompactedFile);
// Tell observers that list of Readers has changed.
- notifyChangedReadersObservers();
+ notifyChangedReadersObservers(orderVal);
// Finally, delete old store files.
for (HStoreFile hsf : toDelete.values()) {
hsf.delete();
Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java?rev=752288&r1=752287&r2=752288&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java Tue Mar 10 22:36:14 2009
@@ -35,8 +35,9 @@
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.io.MapFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
/**
* Scanner scans both the memcache and the HStore
@@ -53,6 +54,7 @@
private HStore store;
private final long timestamp;
private final byte [][] targetCols;
+ private final HStoreKey previousNext;
// Indices for memcache scanner and hstorefile scanner.
private static final int MEMS_INDEX = 0;
@@ -70,6 +72,8 @@
long timestamp, RowFilterInterface filter)
throws IOException {
this.store = store;
+ this.previousNext =
+ new HStoreKey(HConstants.EMPTY_BYTE_ARRAY, this.store.getHRegionInfo());
this.dataFilter = filter;
if (null != dataFilter) {
dataFilter.reset();
@@ -275,7 +279,9 @@
}
}
}
-
+ // Save a copy of the key we give out so we can use it if we have to
+ // update readers.
+ Writables.copyWritable(key, this.previousNext);
return moreToFollow;
} finally {
this.lock.readLock().unlock();
@@ -313,7 +319,7 @@
// Implementation of ChangedReadersObserver
- public void updateReaders() throws IOException {
+ public void updateReaders(final long sequenceid) throws IOException {
if (this.closing.get()) {
return;
}
@@ -325,10 +331,8 @@
// Presume that we went from no readers to at least one -- need to put
// a HStoreScanner in place.
try {
- // I think its safe getting key from mem at this stage -- it shouldn't have
- // been flushed yet
this.scanners[HSFS_INDEX] = new StoreFileScanner(this.store,
- this.timestamp, this. targetCols, this.keys[MEMS_INDEX].getRow());
+ this.timestamp, this. targetCols, this.previousNext.getRow());
checkScannerFlags(HSFS_INDEX);
setupScanner(HSFS_INDEX);
LOG.debug("Added a StoreFileScanner to outstanding HStoreScanner");
@@ -336,9 +340,14 @@
doClose();
throw e;
}
+ } else if (this.scanners[HSFS_INDEX] != null) {
+ // There are outstanding store files. Add in the just flushed file
+ // to the mix.
+ ((StoreFileScanner)this.scanners[HSFS_INDEX]).
+ updateReaders(sequenceid, this.previousNext.getRow());
}
} finally {
this.lock.writeLock().unlock();
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=752288&r1=752287&r2=752288&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Tue Mar 10 22:36:14 2009
@@ -34,8 +34,7 @@
/**
* A scanner that iterates through HStore files
*/
-class StoreFileScanner extends HAbstractScanner
-implements ChangedReadersObserver {
+class StoreFileScanner extends HAbstractScanner {
// Keys retrieved from the sources
private volatile HStoreKey keys[];
// Values that correspond to those keys
@@ -62,9 +61,8 @@
throws IOException {
super(timestamp, targetCols);
this.store = store;
- this.store.addChangedReaderObserver(this);
try {
- openReaders(firstRow);
+ openReaders(firstRow, -1);
} catch (Exception ex) {
close();
IOException e = new IOException("HStoreScanner failed construction");
@@ -79,7 +77,40 @@
* @param firstRow
* @throws IOException
*/
- private void openReaders(final byte [] firstRow) throws IOException {
+ private void openReaders(final byte [] firstRow, final long sequenceid)
+ throws IOException {
+ SortedMap<Long, HStoreFile> storefiles = this.store.getStorefiles();
+ // If just one file was added by a flush, just open the new file.
+ if (this.readers != null &&
+ this.readers.length + 1 == storefiles.size() &&
+ storefiles.lastKey().longValue() == sequenceid) {
+ // Presume that just one file was added. Slot it in front of all the
+ // others.
+ HStoreFile hsf = storefiles.get(Long.valueOf(sequenceid));
+ if (hsf == null) {
+ LOG.warn("Failed getting file for " + sequenceid +
+ "; falling back on close and reopen of all Readers");
+ } else {
+ // Only add in non-null Readers
+ int nonulls = 0;
+ for (int i = 1; i < this.readers.length; i++) {
+ if (this.readers[i] != null) nonulls++;
+ }
+ MapFile.Reader [] newReaders = new MapFile.Reader[nonulls + 1];
+ newReaders[0] = hsf.getReader(store.fs, false, false);
+ int j = 0;
+ for (int i = 1; i < this.readers.length + 1; i++) {
+ MapFile.Reader r = this.readers[i - 1];
+ if (r != null) {
+ newReaders[j++] = r;
+ }
+ }
+ this.readers = newReaders;
+ advance(firstRow);
+ return;
+ }
+ }
+ // If here, then we are opening all files anew.
if (this.readers != null) {
for (int i = 0; i < this.readers.length; i++) {
if (this.readers[i] != null) {
@@ -88,20 +119,24 @@
}
}
// Open our own copies of the Readers here inside in the scanner.
- this.readers = new MapFile.Reader[this.store.getStorefiles().size()];
-
+ this.readers = new MapFile.Reader[storefiles.size()];
+
// Most recent map file should be first
int i = readers.length - 1;
- for(HStoreFile curHSF: store.getStorefiles().values()) {
+ for (HStoreFile curHSF : storefiles.values()) {
readers[i--] = curHSF.getReader(store.fs, false, false);
}
-
+ advance(firstRow);
+ }
+
+ private void advance(final byte [] firstRow) throws IOException {
this.keys = new HStoreKey[readers.length];
this.vals = new byte[readers.length][];
-
+
// Advance the readers to the first pos.
- for (i = 0; i < readers.length; i++) {
- keys[i] = new HStoreKey(HConstants.EMPTY_BYTE_ARRAY, this.store.getHRegionInfo());
+ for (int i = 0; i < readers.length; i++) {
+ keys[i] = new HStoreKey(HConstants.EMPTY_BYTE_ARRAY, this.store
+ .getHRegionInfo());
if (firstRow != null && firstRow.length != 0) {
if (findFirstRow(i, firstRow)) {
continue;
@@ -348,7 +383,6 @@
/** Shut it down! */
public void close() {
if (!this.scannerClosed) {
- this.store.deleteChangedReaderObserver(this);
try {
for(int i = 0; i < readers.length; i++) {
if(readers[i] != null) {
@@ -366,19 +400,19 @@
}
}
- // Implementation of ChangedReadersObserver
-
- public void updateReaders() throws IOException {
+ /**
+ * Called by hosting {@link HStoreScanner}.
+ * @param sequenceid
+ * @param previousNextRow
+ * @throws IOException
+ */
+ void updateReaders(final long sequenceid, final byte [] previousNextRow)
+ throws IOException {
this.lock.writeLock().lock();
try {
- // The keys are currently lined up at the next row to fetch. Pass in
- // the current row as 'first' row and readers will be opened and cue'd
- // up so future call to next will start here.
- ViableRow viableRow = getNextViableRow();
- openReaders(viableRow.getRow());
+ openReaders(previousNextRow, sequenceid);
LOG.debug("Replaced Scanner Readers at row " +
- (viableRow == null || viableRow.getRow() == null? "null":
- Bytes.toString(viableRow.getRow())));
+ (previousNextRow == null? "null": Bytes.toString(previousNextRow)));
} finally {
this.lock.writeLock().unlock();
}
Modified: hadoop/hbase/branches/0.19/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java?rev=752288&r1=752287&r2=752288&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java (original)
+++ hadoop/hbase/branches/0.19/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java Tue Mar 10 22:36:14 2009
@@ -185,6 +185,12 @@
count++;
if (flushIndex == count) {
LOG.info("Starting flush at flush index " + flushIndex);
+ // Add something to flush.
+ addContent(hri, Bytes.toString(HConstants.COL_REGIONINFO),
+ new byte [] {'a', 'b', 'c'}, new byte [] {'d', 'e', 'f'});
+ hri.flushcache();
+ addContent(hri, Bytes.toString(HConstants.COL_REGIONINFO),
+ new byte [] {'a', 'b', 'c'}, new byte [] {'d', 'e', 'f'});
hri.flushcache();
LOG.info("Finishing flush");
}