You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/03/10 23:36:15 UTC

svn commit: r752288 - in /hadoop/hbase/branches/0.19: ./ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/regionserver/

Author: stack
Date: Tue Mar 10 22:36:14 2009
New Revision: 752288

URL: http://svn.apache.org/viewvc?rev=752288&view=rev
Log:
HBASE-1219 and HBASE-1220 Don't reopen file if already open when updating readers underneath scanners

Modified:
    hadoop/hbase/branches/0.19/CHANGES.txt
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
    hadoop/hbase/branches/0.19/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java

Modified: hadoop/hbase/branches/0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/CHANGES.txt?rev=752288&r1=752287&r2=752288&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.19/CHANGES.txt Tue Mar 10 22:36:14 2009
@@ -22,7 +22,11 @@
                to compact when loaded with hundreds of regions
    HBASE-1247  checkAndSave doesn't Write Ahead Log
    HBASE-1243  oldlogfile.dat is screwed, so is it's region
-   HBASE-1169  When a shutdown is requested, stop scanning META regions immediately
+   HBASE-1169  When a shutdown is requested, stop scanning META regions
+               immediately
+   HBASE-1219  Scanners can miss values riding the flush transition
+   HBASE-1220  Don't reopen file if already open when updating readers
+               underneath scanners
 
 
   IMPROVEMENTS

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java?rev=752288&r1=752287&r2=752288&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java Tue Mar 10 22:36:14 2009
@@ -29,7 +29,9 @@
 public interface ChangedReadersObserver {
   /**
    * Notify observers.
+   * @param flushid Flush sequence id for new file.  Acts as identifier for new
+   * file.
    * @throws IOException
    */
-  void updateReaders() throws IOException;
+  void updateReaders(final long flushid) throws IOException;
 }
\ No newline at end of file

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=752288&r1=752287&r2=752288&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Mar 10 22:36:14 2009
@@ -728,7 +728,7 @@
         this.family.isBlockCacheEnabled()));
       this.storefiles.put(flushid, flushedFile);
       // Tell listeners of the change in readers.
-      notifyChangedReadersObservers();
+      notifyChangedReadersObservers(flushid);
     } finally {
       this.lock.writeLock().unlock();
     }
@@ -736,11 +736,14 @@
 
   /*
    * Notify all observers that set of Readers has changed.
+   * @param flushid The flush id for the file just added.  Acts as new-file
+   * identifier.
    * @throws IOException
    */
-  private void notifyChangedReadersObservers() throws IOException {
+  private void notifyChangedReadersObservers(final long flushid)
+  throws IOException {
     for (ChangedReadersObserver o: this.changedReaderObservers) {
-      o.updateReaders();
+      o.updateReaders(flushid);
     }
   }
 
@@ -1290,7 +1293,7 @@
                   this.family.isBlockCacheEnabled()));
           this.storefiles.put(orderVal, finalCompactedFile);
           // Tell observers that list of Readers has changed.
-          notifyChangedReadersObservers();
+          notifyChangedReadersObservers(orderVal);
           // Finally, delete old store files.
           for (HStoreFile hsf : toDelete.values()) {
             hsf.delete();

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java?rev=752288&r1=752287&r2=752288&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java Tue Mar 10 22:36:14 2009
@@ -35,8 +35,9 @@
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.io.MapFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
 
 /**
  * Scanner scans both the memcache and the HStore
@@ -53,6 +54,7 @@
   private HStore store;
   private final long timestamp;
   private final byte [][] targetCols;
+  private final HStoreKey previousNext;
   
   // Indices for memcache scanner and hstorefile scanner.
   private static final int MEMS_INDEX = 0;
@@ -70,6 +72,8 @@
     long timestamp, RowFilterInterface filter) 
   throws IOException {
     this.store = store;
+    this.previousNext =
+      new HStoreKey(HConstants.EMPTY_BYTE_ARRAY, this.store.getHRegionInfo());
     this.dataFilter = filter;
     if (null != dataFilter) {
       dataFilter.reset();
@@ -275,7 +279,9 @@
         }
       }
     }
-    
+    // Save a copy of the key we give out so we can use it if we have to
+    // update readers.
+    Writables.copyWritable(key, this.previousNext);
     return moreToFollow;
     } finally {
       this.lock.readLock().unlock();
@@ -313,7 +319,7 @@
   
   // Implementation of ChangedReadersObserver
   
-  public void updateReaders() throws IOException {
+  public void updateReaders(final long sequenceid) throws IOException {
     if (this.closing.get()) {
       return;
     }
@@ -325,10 +331,8 @@
         // Presume that we went from no readers to at least one -- need to put
         // a HStoreScanner in place.
         try {
-          // I think its safe getting key from mem at this stage -- it shouldn't have
-          // been flushed yet
           this.scanners[HSFS_INDEX] = new StoreFileScanner(this.store,
-              this.timestamp, this. targetCols, this.keys[MEMS_INDEX].getRow());
+            this.timestamp, this. targetCols, this.previousNext.getRow());
           checkScannerFlags(HSFS_INDEX);
           setupScanner(HSFS_INDEX);
           LOG.debug("Added a StoreFileScanner to outstanding HStoreScanner");
@@ -336,9 +340,14 @@
           doClose();
           throw e;
         }
+      } else if (this.scanners[HSFS_INDEX] != null) {
+        // There are outstanding store files.  Add in the just flushed file
+        // to the mix.
+        ((StoreFileScanner)this.scanners[HSFS_INDEX]).
+          updateReaders(sequenceid, this.previousNext.getRow());
       }
     } finally {
       this.lock.writeLock().unlock();
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=752288&r1=752287&r2=752288&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Tue Mar 10 22:36:14 2009
@@ -34,8 +34,7 @@
 /**
  * A scanner that iterates through HStore files
  */
-class StoreFileScanner extends HAbstractScanner
-implements ChangedReadersObserver {
+class StoreFileScanner extends HAbstractScanner {
     // Keys retrieved from the sources
   private volatile HStoreKey keys[];
   // Values that correspond to those keys
@@ -62,9 +61,8 @@
   throws IOException {
     super(timestamp, targetCols);
     this.store = store;
-    this.store.addChangedReaderObserver(this);
     try {
-      openReaders(firstRow);
+      openReaders(firstRow, -1);
     } catch (Exception ex) {
       close();
       IOException e = new IOException("HStoreScanner failed construction");
@@ -79,7 +77,40 @@
    * @param firstRow
    * @throws IOException
    */
-  private void openReaders(final byte [] firstRow) throws IOException {
+  private void openReaders(final byte [] firstRow, final long sequenceid)
+  throws IOException {
+    SortedMap<Long, HStoreFile> storefiles = this.store.getStorefiles();
+    // If just one file was added by a flush, just open the new file.
+    if (this.readers != null &&
+        this.readers.length + 1 == storefiles.size() &&
+        storefiles.lastKey().longValue() == sequenceid) {
+      // Presume that just one file was added.  Slot it in front of all the
+      // others.
+      HStoreFile hsf = storefiles.get(Long.valueOf(sequenceid));
+      if (hsf == null) {
+        LOG.warn("Failed getting file for " + sequenceid +
+          "; falling back on close and reopen of all Readers");
+      } else {
+        // Only add in non-null Readers
+        int nonulls = 0;
+        for (int i = 1; i < this.readers.length; i++) {
+          if (this.readers[i] != null) nonulls++;
+        }
+        MapFile.Reader [] newReaders = new MapFile.Reader[nonulls + 1];
+        newReaders[0] = hsf.getReader(store.fs, false, false);
+        int j = 0;
+        for (int i = 1; i < this.readers.length + 1; i++) {
+          MapFile.Reader r = this.readers[i - 1];
+          if (r != null) {
+            newReaders[j++] = r;
+          }
+        }
+        this.readers = newReaders;
+        advance(firstRow);
+        return;
+      }
+    }
+    // If here, then we are opening all files anew.
     if (this.readers != null) {
       for (int i = 0; i < this.readers.length; i++) {
         if (this.readers[i] != null) {
@@ -88,20 +119,24 @@
       }
     }
     // Open our own copies of the Readers here inside in the scanner.
-    this.readers = new MapFile.Reader[this.store.getStorefiles().size()];
-    
+    this.readers = new MapFile.Reader[storefiles.size()];
+
     // Most recent map file should be first
     int i = readers.length - 1;
-    for(HStoreFile curHSF: store.getStorefiles().values()) {
+    for (HStoreFile curHSF : storefiles.values()) {
       readers[i--] = curHSF.getReader(store.fs, false, false);
     }
-    
+    advance(firstRow);
+  }
+
+  private void advance(final byte [] firstRow) throws IOException {
     this.keys = new HStoreKey[readers.length];
     this.vals = new byte[readers.length][];
-    
+
     // Advance the readers to the first pos.
-    for (i = 0; i < readers.length; i++) {
-      keys[i] = new HStoreKey(HConstants.EMPTY_BYTE_ARRAY, this.store.getHRegionInfo());
+    for (int i = 0; i < readers.length; i++) {
+      keys[i] = new HStoreKey(HConstants.EMPTY_BYTE_ARRAY, this.store
+          .getHRegionInfo());
       if (firstRow != null && firstRow.length != 0) {
         if (findFirstRow(i, firstRow)) {
           continue;
@@ -348,7 +383,6 @@
   /** Shut it down! */
   public void close() {
     if (!this.scannerClosed) {
-      this.store.deleteChangedReaderObserver(this);
       try {
         for(int i = 0; i < readers.length; i++) {
           if(readers[i] != null) {
@@ -366,19 +400,19 @@
     }
   }
 
-  // Implementation of ChangedReadersObserver
-  
-  public void updateReaders() throws IOException {
+  /**
+   * Called by hosting {@link HStoreScanner}.
+   * @param sequenceid
+   * @param previousNextRow
+   * @throws IOException
+   */
+  void updateReaders(final long sequenceid, final byte [] previousNextRow)
+  throws IOException {
     this.lock.writeLock().lock();
     try {
-      // The keys are currently lined up at the next row to fetch.  Pass in
-      // the current row as 'first' row and readers will be opened and cue'd
-      // up so future call to next will start here.
-      ViableRow viableRow = getNextViableRow();
-      openReaders(viableRow.getRow());
+      openReaders(previousNextRow, sequenceid);
       LOG.debug("Replaced Scanner Readers at row " +
-        (viableRow == null || viableRow.getRow() == null? "null":
-          Bytes.toString(viableRow.getRow())));
+        (previousNextRow == null? "null": Bytes.toString(previousNextRow)));
     } finally {
       this.lock.writeLock().unlock();
     }

Modified: hadoop/hbase/branches/0.19/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java?rev=752288&r1=752287&r2=752288&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java (original)
+++ hadoop/hbase/branches/0.19/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java Tue Mar 10 22:36:14 2009
@@ -185,6 +185,12 @@
       count++;
       if (flushIndex == count) {
         LOG.info("Starting flush at flush index " + flushIndex);
+        // Add something to flush.
+        addContent(hri, Bytes.toString(HConstants.COL_REGIONINFO),
+          new byte [] {'a', 'b', 'c'}, new byte [] {'d', 'e', 'f'});
+        hri.flushcache();
+        addContent(hri, Bytes.toString(HConstants.COL_REGIONINFO),
+          new byte [] {'a', 'b', 'c'}, new byte [] {'d', 'e', 'f'});
         hri.flushcache();
         LOG.info("Finishing flush");
       }