You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2008/11/21 03:37:04 UTC

svn commit: r719453 - in /hadoop/hbase/branches/0.19_on_hadoop_0.18/src: java/org/apache/hadoop/hbase/regionserver/ test/org/apache/hadoop/hbase/regionserver/

Author: apurtell
Date: Thu Nov 20 18:37:04 2008
New Revision: 719453

URL: http://svn.apache.org/viewvc?rev=719453&view=rev
Log:
merge up to trunk (revision 719452)

Modified:
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Thu Nov 20 18:37:04 2008
@@ -244,35 +244,17 @@
         return;
       }
       synchronized (updateLock) {
-        if (this.writer != null) {
-          // Close the current writer, get a new one.
-          try {
-            this.writer.close();
-          } catch (IOException e) {
-            // Failed close of log file.  Means we're losing edits.  For now,
-            // shut ourselves down to minimize loss.  Alternative is to try and
-            // keep going.  See HBASE-930.
-            FailedLogCloseException flce =
-              new FailedLogCloseException("#" + this.filenum);
-            flce.initCause(e);
-            throw e; 
-          }
-          Path p = computeFilename(old_filenum);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Closing current log writer " + FSUtils.getPath(p));
-          }
-          if (filenum > 0) {
-            synchronized (this.sequenceLock) {
-              this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), p);
-            }
-          }
-        }
-        old_filenum = filenum;
-        filenum = System.currentTimeMillis();
-        Path newPath = computeFilename(filenum);
+        // Clean up current writer.
+        Path oldFile = cleanupCurrentWriter();
+        // Create a new one.
+        this.old_filenum = this.filenum;
+        this.filenum = System.currentTimeMillis();
+        Path newPath = computeFilename(this.filenum);
         this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
-            HLogKey.class, HLogEdit.class, getCompressionType(this.conf));
-        LOG.info("New log writer created at " + FSUtils.getPath(newPath));
+          HLogKey.class, HLogEdit.class, getCompressionType(this.conf));
+        LOG.info((oldFile != null?
+          "Closed " + oldFile + ", entries=" + this.numEntries + ". ": "") +
+          "New log writer: " + FSUtils.getPath(newPath));
 
         // Can we delete any of the old log files?
         if (this.outputfiles.size() > 0) {
@@ -286,38 +268,7 @@
             }
             this.outputfiles.clear();
           } else {
-            // Get oldest edit/sequence id.  If logs are older than this id,
-            // then safe to remove.
-            Long oldestOutstandingSeqNum =
-              Collections.min(this.lastSeqWritten.values());
-            // Get the set of all log files whose final ID is older than or
-            // equal to the oldest pending region operation
-            TreeSet<Long> sequenceNumbers =
-              new TreeSet<Long>(this.outputfiles.headMap(
-                (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet());
-            // Now remove old log files (if any)
-            if (LOG.isDebugEnabled()) {
-              // Find region associated with oldest key -- helps debugging.
-              byte [] oldestRegion = null;
-              for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
-                if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
-                  oldestRegion = e.getKey();
-                  break;
-                }
-              }
-              if (LOG.isDebugEnabled() && sequenceNumbers.size() > 0) {
-                LOG.debug("Found " + sequenceNumbers.size() +
-                  " logs to remove " +
-                  "using oldest outstanding seqnum of " +
-                  oldestOutstandingSeqNum + " from region " +
-                  Bytes.toString(oldestRegion));
-              }
-            }
-            if (sequenceNumbers.size() > 0) {
-              for (Long seq : sequenceNumbers) {
-                deleteLogFile(this.outputfiles.remove(seq), seq);
-              }
-            }
+            cleanOldLogs();
           }
         }
         this.numEntries = 0;
@@ -328,6 +279,73 @@
     }
   }
   
+  /*
+   * Clean up old commit logs.
+   * @throws IOException
+   */
+  private void cleanOldLogs() throws IOException {
+    // Get oldest edit/sequence id.  If logs are older than this id,
+    // then safe to remove.
+    Long oldestOutstandingSeqNum =
+      Collections.min(this.lastSeqWritten.values());
+    // Get the set of all log files whose final ID is older than or
+    // equal to the oldest pending region operation
+    TreeSet<Long> sequenceNumbers =
+      new TreeSet<Long>(this.outputfiles.headMap(
+        (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet());
+    // Now remove old log files (if any)
+    if (LOG.isDebugEnabled()) {
+      // Find region associated with oldest key -- helps debugging.
+      byte [] oldestRegion = null;
+      for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
+        if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
+          oldestRegion = e.getKey();
+          break;
+        }
+      }
+      LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
+        " out of total " + this.outputfiles.size() + "; " +
+        "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
+        " from region " + Bytes.toString(oldestRegion));
+    }
+    if (sequenceNumbers.size() > 0) {
+      for (Long seq : sequenceNumbers) {
+        deleteLogFile(this.outputfiles.remove(seq), seq);
+      }
+    }
+  }
+
+  /*
+   * Cleans up current writer closing and adding to outputfiles.
+   * Presumes we're operating inside an updateLock scope.
+   * @return Path to current writer or null if none.
+   * @throws IOException
+   */
+  private Path cleanupCurrentWriter() throws IOException {
+    Path oldFile = null;
+    if (this.writer != null) {
+      // Close the current writer, get a new one.
+      try {
+        this.writer.close();
+      } catch (IOException e) {
+        // Failed close of log file.  Means we're losing edits.  For now,
+        // shut ourselves down to minimize loss.  Alternative is to try and
+        // keep going.  See HBASE-930.
+        FailedLogCloseException flce =
+          new FailedLogCloseException("#" + this.filenum);
+        flce.initCause(e);
+        throw e; 
+      }
+      oldFile = computeFilename(old_filenum);
+      if (filenum > 0) {
+        synchronized (this.sequenceLock) {
+          this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), oldFile);
+        }
+      }
+    }
+    return oldFile;
+  }
+
   private void deleteLogFile(final Path p, final Long seqno) throws IOException {
     LOG.info("removing old log file " + FSUtils.getPath(p) +
       " whose highest sequence/edit id is " + seqno);
@@ -626,8 +644,9 @@
   }
   
   /**
-   * Split up a bunch of log files, that are no longer being written to, into
-   * new files, one per region. Delete the old log files when finished.
+   * Split up a bunch of regionserver commit log files that are no longer
+   * being written to, into new files, one per region for region to replay on
+   * startup. Delete the old log files when finished.
    *
    * @param rootDir qualified root directory of the HBase instance
    * @param srcDir Directory of log files to split: e.g.
@@ -636,19 +655,42 @@
    * @param conf HBaseConfiguration
    * @throws IOException
    */
-  public static void splitLog(Path rootDir, Path srcDir, FileSystem fs,
-    Configuration conf) throws IOException {
+  public static void splitLog(final Path rootDir, final Path srcDir,
+      final FileSystem fs, final Configuration conf)
+  throws IOException {
     if (!fs.exists(srcDir)) {
       // Nothing to do
       return;
     }
-    FileStatus logfiles[] = fs.listStatus(srcDir);
+    FileStatus [] logfiles = fs.listStatus(srcDir);
     if (logfiles == null || logfiles.length == 0) {
       // Nothing to do
       return;
     }
-    LOG.info("splitting " + logfiles.length + " log(s) in " +
+    LOG.info("Splitting " + logfiles.length + " log(s) in " +
       srcDir.toString());
+    splitLog(rootDir, logfiles, fs, conf);
+    try {
+      fs.delete(srcDir, true);
+    } catch (IOException e) {
+      e = RemoteExceptionHandler.checkIOException(e);
+      IOException io = new IOException("Cannot delete: " + srcDir);
+      io.initCause(e);
+      throw io;
+    }
+    LOG.info("log file splitting completed for " + srcDir.toString());
+  }
+  
+  /*
+   * @param rootDir
+   * @param logfiles
+   * @param fs
+   * @param conf
+   * @throws IOException
+   */
+  private static void splitLog(final Path rootDir, final FileStatus [] logfiles,
+    final FileSystem fs, final Configuration conf)
+  throws IOException {
     Map<byte [], SequenceFile.Writer> logWriters =
       new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
     try {
@@ -743,16 +785,6 @@
         w.close();
       }
     }
-
-    try {
-      fs.delete(srcDir, true);
-    } catch (IOException e) {
-      e = RemoteExceptionHandler.checkIOException(e);
-      IOException io = new IOException("Cannot delete: " + srcDir);
-      io.initCause(e);
-      throw io;
-    }
-    LOG.info("log file splitting completed for " + srcDir.toString());
   }
 
   /**

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Nov 20 18:37:04 2008
@@ -110,6 +110,11 @@
   static final Random rand = new Random();
   static final Log LOG = LogFactory.getLog(HRegion.class);
   final AtomicBoolean closed = new AtomicBoolean(false);
+  /* Closing can take some time; use the closing flag if there is stuff we don't want
+   * to do while in closing state; e.g. like offer this region up to the master as a region
+   * to close if the carrying regionserver is overloaded.  Once set, it is never cleared.
+   */
+  private final AtomicBoolean closing = new AtomicBoolean(false);
   private final RegionHistorian historian;
 
   //////////////////////////////////////////////////////////////////////////////
@@ -330,6 +335,13 @@
   }
   
   /**
+   * @return True if closing process has started.
+   */
+  public boolean isClosing() {
+    return this.closing.get();
+  }
+  
+  /**
    * Close down this HRegion.  Flush the cache, shut down each HStore, don't 
    * service any more calls.
    *
@@ -365,6 +377,7 @@
       LOG.warn("region " + this + " already closed");
       return null;
     }
+    this.closing.set(true);
     synchronized (splitLock) {
       synchronized (writestate) {
         // Disable compacting and flushing by background threads for this
@@ -419,7 +432,6 @@
             result.addAll(store.close());
           }
           this.closed.set(true);
-          
           LOG.info("Closed " + this);
           return result;
         } finally {

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Nov 20 18:37:04 2008
@@ -296,8 +296,7 @@
       long lastMsg = 0;
       // Now ask master what it wants us to do and tell it what we have done
       for (int tries = 0; !stopRequested.get() && isHealthy();) {
-        // Try to get the root region location from the master. 
-        if (!haveRootRegion.get()) {
+        // Try to get the root region location from the master.
           HServerAddress rootServer = hbaseMaster.getRootRegionLocation();
           if (rootServer != null) {
             // By setting the root region location, we bypass the wait imposed on
@@ -306,8 +305,7 @@
                 new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, rootServer));
             haveRootRegion.set(true);
           }
-        }
-        long now = System.currentTimeMillis();
+          long now = System.currentTimeMillis();
         if (lastMsg != 0 && (now - lastMsg) >= serverLeaseTimeout) {
           // It has been way too long since we last reported to the master.
           LOG.warn("unable to report to master for " + (now - lastMsg) +
@@ -565,7 +563,6 @@
    * only called when the HRegionServer receives a kill signal.
    */
   private static class ShutdownThread extends Thread {
-    private final Log LOG = LogFactory.getLog(this.getClass());
     private final HRegionServer instance;
     
     /**
@@ -591,7 +588,6 @@
    * compaction.
    */
   private static class MajorCompactionChecker extends Chore {
-    private final Log LOG = LogFactory.getLog(this.getClass());
     private final HRegionServer instance;
     
     MajorCompactionChecker(final HRegionServer h,
@@ -617,7 +613,7 @@
         }
       }
     }
-  };
+  }
   
   /**
    * Report the status of the server. A server is online once all the startup 
@@ -1762,6 +1758,9 @@
     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
     synchronized (onlineRegions) {
       for (HRegion r : onlineRegions.values()) {
+        if (r.isClosed() || r.isClosing()) {
+          continue;
+        }
         if (regions.size() < numRegionsToReport) {
           regions.add(r.getRegionInfo());
         } else {

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Thu Nov 20 18:37:04 2008
@@ -908,7 +908,7 @@
           this.compactionDir,  this.info, family.getName(), -1L, null);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Started compaction of " + rdrs.size() + " file(s)" +
-          (references? "(hasReferences=true)": " ") + " into " +
+          (references? ", hasReferences=true,": " ") + " into " +
           FSUtils.getPath(compactedOutputFile.getMapFilePath()));
       }
       MapFile.Writer writer = compactedOutputFile.getWriter(this.fs,

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java Thu Nov 20 18:37:04 2008
@@ -21,13 +21,12 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,11 +35,12 @@
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.MapFile;
 
 /**
  * Scanner scans both the memcache and the HStore
  */
-class HStoreScanner implements InternalScanner {
+class HStoreScanner implements InternalScanner,  ChangedReadersObserver {
   static final Log LOG = LogFactory.getLog(HStoreScanner.class);
 
   private InternalScanner[] scanners;
@@ -50,6 +50,15 @@
   private boolean multipleMatchers = false;
   private RowFilterInterface dataFilter;
   private HStore store;
+  private final long timestamp;
+  private final byte [][] targetCols;
+  
+  // Indices for memcache scanner and hstorefile scanner.
+  private static final int MEMS_INDEX = 0;
+  private static final int HSFS_INDEX = MEMS_INDEX + 1;
+  
+  // Used around transition from no storefile to the first.
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   
   /** Create an Scanner with a handle on the memcache and HStore files. */
   @SuppressWarnings("unchecked")
@@ -64,51 +73,72 @@
     this.scanners = new InternalScanner[2];
     this.resultSets = new TreeMap[scanners.length];
     this.keys = new HStoreKey[scanners.length];
+    // Save these args in case we need them later handling change in readers
+    // See updateReaders below.
+    this.timestamp = timestamp;
+    this.targetCols = targetCols;
 
     try {
-      scanners[0] = store.memcache.getScanner(timestamp, targetCols, firstRow);
-      scanners[1] = new StoreFileScanner(store, timestamp, targetCols, firstRow);
-      for (int i = 0; i < scanners.length; i++) {
-        if (scanners[i].isWildcardScanner()) {
-          this.wildcardMatch = true;
-        }
-        if (scanners[i].isMultipleMatchScanner()) {
-          this.multipleMatchers = true;
-        }
-      }
-    } catch(IOException e) {
-      for (int i = 0; i < this.scanners.length; i++) {
-        if(scanners[i] != null) {
-          closeScanner(i);
-        }
+      scanners[MEMS_INDEX] =
+        store.memcache.getScanner(timestamp, targetCols, firstRow);
+      scanners[HSFS_INDEX] =
+        new StoreFileScanner(store, timestamp, targetCols, firstRow);
+      for (int i = MEMS_INDEX; i < scanners.length; i++) {
+        checkScannerFlags(i);
       }
+    } catch (IOException e) {
+      doClose();
       throw e;
     }
     
     // Advance to the first key in each scanner.
     // All results will match the required column-set and scanTime.
-    for (int i = 0; i < scanners.length; i++) {
-      keys[i] = new HStoreKey();
-      resultSets[i] = new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
-      if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
-        closeScanner(i);
-      }
+    for (int i = MEMS_INDEX; i < scanners.length; i++) {
+      setupScanner(i);
+    }
+    
+    this.store.addChangedReaderObserver(this);
+  }
+  
+  /*
+   * @param i Index.
+   */
+  private void checkScannerFlags(final int i) {
+    if (this.scanners[i].isWildcardScanner()) {
+      this.wildcardMatch = true;
+    }
+    if (this.scanners[i].isMultipleMatchScanner()) {
+      this.multipleMatchers = true;
+    }
+  }
+  
+  /*
+   * Do scanner setup.
+   * @param i
+   * @throws IOException
+   */
+  private void setupScanner(final int i) throws IOException {
+    this.keys[i] = new HStoreKey();
+    this.resultSets[i] = new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
+    if (this.scanners[i] != null && !this.scanners[i].next(this.keys[i], this.resultSets[i])) {
+      closeScanner(i);
     }
   }
 
   /** @return true if the scanner is a wild card scanner */
   public boolean isWildcardScanner() {
-    return wildcardMatch;
+    return this.wildcardMatch;
   }
 
   /** @return true if the scanner is a multiple match scanner */
   public boolean isMultipleMatchScanner() {
-    return multipleMatchers;
+    return this.multipleMatchers;
   }
 
   public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
-    throws IOException {
-
+  throws IOException {
+    this.lock.readLock().lock();
+    try {
     // Filtered flag is set by filters.  If a cell has been 'filtered out'
     // -- i.e. it is not to be returned to the caller -- the flag is 'true'.
     boolean filtered = true;
@@ -243,6 +273,9 @@
     }
     
     return moreToFollow;
+    } finally {
+      this.lock.readLock().unlock();
+    }
   }
   
   /** Shut down a single scanner */
@@ -261,10 +294,43 @@
   }
 
   public void close() {
-    for(int i = 0; i < scanners.length; i++) {
-      if(scanners[i] != null) {
+    this.store.deleteChangedReaderObserver(this);
+    doClose();
+  }
+  
+  private void doClose() {
+    for (int i = MEMS_INDEX; i < scanners.length; i++) {
+      if (scanners[i] != null) {
         closeScanner(i);
       }
     }
   }
-}
+  
+  // Implementation of ChangedReadersObserver
+  
+  public void updateReaders() throws IOException {
+    this.lock.writeLock().lock();
+    try {
+      MapFile.Reader [] readers = this.store.getReaders();
+      if (this.scanners[HSFS_INDEX] == null && readers != null &&
+          readers.length > 0) {
+        // Presume that we went from no readers to at least one -- need to put
+        // a HStoreScanner in place.
+        try {
+          // I think its safe getting key from mem at this stage -- it shouldn't have
+          // been flushed yet
+          this.scanners[HSFS_INDEX] = new StoreFileScanner(this.store,
+              this.timestamp, this. targetCols, this.keys[MEMS_INDEX].getRow());
+          checkScannerFlags(HSFS_INDEX);
+          setupScanner(HSFS_INDEX);
+          LOG.debug("Added a StoreFileScanner to outstanding HStoreScanner");
+        } catch (IOException e) {
+          doClose();
+          throw e;
+        }
+      }
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Thu Nov 20 18:37:04 2008
@@ -61,8 +61,6 @@
       }
       rollLock.lock();          // Don't interrupt us. We're working
       try {
-        LOG.info("Rolling hlog. Number of entries: " +
-            server.getLog().getNumEntries());
         server.getLog().rollWriter();
       } catch (FailedLogCloseException e) {
         LOG.fatal("Forcing server shutdown", e);

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java?rev=719453&r1=719452&r2=719453&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java Thu Nov 20 18:37:04 2008
@@ -22,25 +22,30 @@
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.SortedMap;
 import java.util.TreeMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.dfs.MiniDFSCluster;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.HBaseTestCase;
-import org.apache.hadoop.hbase.HRegionInfo;
-
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
 
 /**
  * Test of a long-lived scanner validating as we go.
  */
 public class TestScanner extends HBaseTestCase {
-  private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
+  private final Log LOG = LogFactory.getLog(this.getClass());
+  
+  private static final byte [] FIRST_ROW =
+    HConstants.EMPTY_START_ROW;
   private static final byte [][] COLS = {
       HConstants.COLUMN_FAMILY
   };
@@ -52,7 +57,8 @@
   
   private static final byte [] ROW_KEY =
     HRegionInfo.ROOT_REGIONINFO.getRegionName();
-  private static final HRegionInfo REGION_INFO = HRegionInfo.ROOT_REGIONINFO;
+  private static final HRegionInfo REGION_INFO =
+    HRegionInfo.ROOT_REGIONINFO;
   
   private static final long START_CODE = Long.MAX_VALUE;
 
@@ -84,8 +90,7 @@
   
   /** Use a scanner to get the region info and then validate the results */
   private void scan(boolean validateStartcode, String serverName)
-      throws IOException {
-    
+  throws IOException {  
     InternalScanner scanner = null;
     TreeMap<byte [], Cell> results =
       new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
@@ -140,7 +145,55 @@
     byte [] bytes = region.get(ROW_KEY, HConstants.COL_REGIONINFO).getValue();
     validateRegionInfo(bytes);  
   }
+  
+  /**
+   * HBase-910.
+   * @throws Exception
+   */
+  public void testScanAndConcurrentFlush() throws Exception {
+    this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
+    HRegionIncommon hri = new HRegionIncommon(r);
+    try {
+      addContent(hri, Bytes.toString(HConstants.COL_REGIONINFO));
+      int count = count(hri, -1);
+      assertEquals(count, count(hri, 100));
+      assertEquals(count, count(hri, 0));
+      assertEquals(count, count(hri, count - 1));
+    } finally {
+      this.r.close();
+      this.r.getLog().closeAndDelete();
+      shutdownDfs(cluster);
+    }
+  }
  
+  /*
+   * @param hri Region
+   * @param flushIndex At what row we start the flush.
+   * @return Count of rows found.
+   * @throws IOException
+   */
+  private int count(final HRegionIncommon hri, final int flushIndex)
+  throws IOException {
+    LOG.info("Taking out counting scan");
+    ScannerIncommon s = hri.getScanner(EXPLICIT_COLS,
+        HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP);
+    HStoreKey key = new HStoreKey();
+    SortedMap<byte [], Cell> values =
+      new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
+    int count = 0;
+    while (s.next(key, values)) {
+      count++;
+      if (flushIndex == count) {
+        LOG.info("Starting flush at flush index " + flushIndex);
+        hri.flushcache();
+        LOG.info("Finishing flush");
+      }
+    }
+    s.close();
+    LOG.info("Found " + count + " items");
+    return count;
+  }
+
   /** The test!
    * @throws IOException
    */