You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/12/13 21:02:31 UTC

svn commit: r604011 - in /lucene/hadoop/trunk/src/contrib/hbase: CHANGES.txt src/java/org/apache/hadoop/hbase/HMaster.java src/java/org/apache/hadoop/hbase/HRegion.java src/java/org/apache/hadoop/hbase/HRegionServer.java

Author: jimk
Date: Thu Dec 13 12:02:30 2007
New Revision: 604011

URL: http://svn.apache.org/viewvc?rev=604011&view=rev
Log:
HADOOP-2417 Fix critical shutdown problem introduced by HADOOP-2338

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=604011&r1=604010&r2=604011&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Thu Dec 13 12:02:30 2007
@@ -77,7 +77,8 @@
    HADOOP-2396 NPE in HMaster.cancelLease
    HADOOP-2397 The only time that a meta scanner should try to recover a log is
                when the master is starting
-
+   HADOOP-2417 Fix critical shutdown problem introduced by HADOOP-2338
+   
   IMPROVEMENTS
    HADOOP-2401 Add convenience put method that takes writable
                (Johan Oskarsson via Stack)

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=604011&r1=604010&r2=604011&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Thu Dec 13 12:02:30 2007
@@ -181,16 +181,14 @@
    */
   abstract class BaseScanner extends Chore {
     protected boolean rootRegion;
-    protected final Text tableName;
 
     protected abstract boolean initialScan();
     protected abstract void maintenanceScan();
 
-    BaseScanner(final Text tableName, final int period,
+    BaseScanner(final boolean rootRegion, final int period,
         final AtomicBoolean stop) {
       super(period, stop);
-      this.tableName = tableName;
-      this.rootRegion = tableName.equals(ROOT_TABLE_NAME);
+      this.rootRegion = rootRegion;
     }
     
     @Override
@@ -506,7 +504,7 @@
   class RootScanner extends BaseScanner {
     /** Constructor */
     public RootScanner() {
-      super(HConstants.ROOT_TABLE_NAME, metaRescanInterval, closed);
+      super(true, metaRescanInterval, closed);
     }
 
     private boolean scanRoot() {
@@ -671,7 +669,7 @@
     
     /** Constructor */
     public MetaScanner() {
-      super(HConstants.META_TABLE_NAME, metaRescanInterval, closed);
+      super(false, metaRescanInterval, closed);
     }
 
     private boolean scanOneMetaRegion(MetaRegion region) {
@@ -1182,16 +1180,25 @@
    * regions can shut down.
    */
   private void stopScanners() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("telling root scanner to stop");
+    }
     synchronized(rootScannerLock) {
       if (rootScannerThread.isAlive()) {
         rootScannerThread.interrupt();  // Wake root scanner
       }
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("telling meta scanner to stop");
+    }
     synchronized(metaScannerLock) {
       if (metaScannerThread.isAlive()) {
         metaScannerThread.interrupt();  // Wake meta scanner
       }
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("meta and root scanners notified");
+    }
   }
 
   /*
@@ -1341,18 +1348,23 @@
         }
       } else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) {
         LOG.info("Region server " + serverName + " quiesced");
-        if(quiescedMetaServers.incrementAndGet() == serversToServerInfo.size()) {
-          // If the only servers we know about are meta servers, then we can
-          // proceed with shutdown
-          LOG.info("All user tables quiesced. Proceeding with shutdown");
-          closed.set(true);
-          stopScanners();
-          synchronized(toDoQueue) {
-            toDoQueue.clear();                         // Empty the queue
-            delayedToDoQueue.clear();                  // Empty shut down queue
-            toDoQueue.notifyAll();                     // Wake main thread
-          }
-        }
+        quiescedMetaServers.incrementAndGet();
+      }
+    }
+
+    if(quiescedMetaServers.get() >= serversToServerInfo.size()) {
+      // If the only servers we know about are meta servers, then we can
+      // proceed with shutdown
+      LOG.info("All user tables quiesced. Proceeding with shutdown");
+      closed.set(true);
+      stopScanners();
+      synchronized(toDoQueue) {
+        toDoQueue.clear();                         // Empty the queue
+        delayedToDoQueue.clear();                  // Empty shut down queue
+        toDoQueue.notifyAll();                     // Wake main thread
+      }
+      synchronized (serversToServerInfo) {
+        serversToServerInfo.notifyAll();
       }
     }
 
@@ -1638,7 +1650,7 @@
             " split. New regions are: " + newRegionA.getRegionName() + ", " +
             newRegionB.getRegionName());
 
-        if (region.getTableDesc().getName().equals(META_TABLE_NAME)) {
+        if (region.isMetaTable()) {
           // A meta region has split.
 
           onlineMetaRegions.remove(region.getStartKey());
@@ -2028,7 +2040,7 @@
               serverName + "> (or server is null). Marking unassigned if " +
           "meta and clearing pendingRegions");
 
-          if (info.getTableDesc().getName().equals(META_TABLE_NAME)) {
+          if (info.isMetaTable()) {
             if (LOG.isDebugEnabled()) {
               LOG.debug("removing meta region " + info.getRegionName() +
                   " from online meta regions");

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=604011&r1=604010&r2=604011&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Thu Dec 13 12:02:30 2007
@@ -225,6 +225,7 @@
   protected final long threadWakeFrequency;
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final Integer updateLock = new Integer(0);
+  private final Integer splitLock = new Integer(0);
   private final long desiredMaxFileSize;
   private final long minSequenceId;
   private final String encodedRegionName;
@@ -381,54 +382,56 @@
       LOG.info("region " + this.regionInfo.getRegionName() + " already closed");
       return null;
     }
-    lock.writeLock().lock();
-    try {
-      synchronized (writestate) {
-        while (writestate.compacting || writestate.flushing) {
-          try {
-            writestate.wait();
-          } catch (InterruptedException iex) {
-            // continue
+    synchronized (splitLock) {
+      lock.writeLock().lock();
+      try {
+        synchronized (writestate) {
+          while (writestate.compacting || writestate.flushing) {
+            try {
+              writestate.wait();
+            } catch (InterruptedException iex) {
+              // continue
+            }
           }
+          // Disable compacting and flushing by background threads for this
+          // region.
+          writestate.writesEnabled = false;
         }
-        // Disable compacting and flushing by background threads for this
-        // region.
-        writestate.writesEnabled = false;
-      }
-      
-      // Wait for active scanners to finish. The write lock we hold will prevent
-      // new scanners from being created.
-      
-      synchronized (activeScannerCount) {
-        while (activeScannerCount.get() != 0) {
-          try {
-            activeScannerCount.wait();
-            
-          } catch (InterruptedException e) {
-            // continue
+
+        // Wait for active scanners to finish. The write lock we hold will prevent
+        // new scanners from being created.
+
+        synchronized (activeScannerCount) {
+          while (activeScannerCount.get() != 0) {
+            try {
+              activeScannerCount.wait();
+
+            } catch (InterruptedException e) {
+              // continue
+            }
           }
         }
+
+        // Write lock means no more row locks can be given out.  Wait on
+        // outstanding row locks to come in before we close so we do not drop
+        // outstanding updates.
+        waitOnRowLocks();
+
+        // Don't flush the cache if we are aborting
+        if (!abort) {
+          internalFlushcache(snapshotMemcaches());
+        }
+
+        List<HStoreFile> result = new ArrayList<HStoreFile>();
+        for (HStore store: stores.values()) {
+          result.addAll(store.close());
+        }
+        this.closed.set(true);
+        LOG.info("closed " + this.regionInfo.getRegionName());
+        return result;
+      } finally {
+        lock.writeLock().unlock();
       }
-      
-      // Write lock means no more row locks can be given out.  Wait on
-      // outstanding row locks to come in before we close so we do not drop
-      // outstanding updates.
-      waitOnRowLocks();
-
-      // Don't flush the cache if we are aborting
-      if (!abort) {
-        internalFlushcache(snapshotMemcaches());
-      }
-
-      List<HStoreFile> result = new ArrayList<HStoreFile>();
-      for (HStore store: stores.values()) {
-        result.addAll(store.close());
-      }
-      this.closed.set(true);
-      LOG.info("closed " + this.regionInfo.getRegionName());
-      return result;
-    } finally {
-      lock.writeLock().unlock();
     }
   }
   
@@ -541,89 +544,91 @@
   HRegion[] splitRegion(final RegionUnavailableListener listener)
     throws IOException {
 
-    Text midKey = new Text();
-    if (!needsSplit(midKey)) {
-      return null;
-    }
-    long startTime = System.currentTimeMillis();
-    Path splits = getSplitsDir();
-    HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
-        this.regionInfo.getStartKey(), midKey);
-    Path dirA = getSplitRegionDir(splits,
-        HRegionInfo.encodeRegionName(regionAInfo.getRegionName()));
-    if(fs.exists(dirA)) {
-      throw new IOException("Cannot split; target file collision at " + dirA);
-    }
-    HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
-        midKey, null);
-    Path dirB = getSplitRegionDir(splits,
-        HRegionInfo.encodeRegionName(regionBInfo.getRegionName()));
-    if(this.fs.exists(dirB)) {
-      throw new IOException("Cannot split; target file collision at " + dirB);
-    }
-
-    // Notify the caller that we are about to close the region. This moves
-    // us to the 'retiring' queue. Means no more updates coming in -- just
-    // whatever is outstanding.
-    if (listener != null) {
-      listener.closing(getRegionName());
-    }
-
-    // Now close the HRegion.  Close returns all store files or null if not
-    // supposed to close (? What to do in this case? Implement abort of close?)
-    // Close also does wait on outstanding rows and calls a flush just-in-case.
-    List<HStoreFile> hstoreFilesToSplit = close();
-    if (hstoreFilesToSplit == null) {
-      LOG.warn("Close came back null (Implement abort of close?)");
-      throw new RuntimeException("close returned empty vector of HStoreFiles");
-    }
-    
-    // Tell listener that region is now closed and that they can therefore
-    // clean up any outstanding references.
-    if (listener != null) {
-      listener.closed(this.getRegionName());
-    }
-    
-    // Split each store file.
-    for(HStoreFile h: hstoreFilesToSplit) {
-      // A reference to the bottom half of the hsf store file.
-      HStoreFile.Reference aReference = new HStoreFile.Reference(
-        this.encodedRegionName, h.getFileId(), new HStoreKey(midKey),
-        HStoreFile.Range.bottom);
-      HStoreFile a = new HStoreFile(this.conf, splits,
-          HRegionInfo.encodeRegionName(regionAInfo.getRegionName()),
-          h.getColFamily(), Math.abs(rand.nextLong()), aReference);
-      // Reference to top half of the hsf store file.
-      HStoreFile.Reference bReference = new HStoreFile.Reference(
-        this.encodedRegionName, h.getFileId(), new HStoreKey(midKey),
-        HStoreFile.Range.top);
-      HStoreFile b = new HStoreFile(this.conf, splits,
-        HRegionInfo.encodeRegionName(regionBInfo.getRegionName()),
-        h.getColFamily(), Math.abs(rand.nextLong()), bReference);
-      h.splitStoreFile(a, b, this.fs);
-    }
-
-    // Done!
-    // Opening the region copies the splits files from the splits directory
-    // under each region.
-    HRegion regionA =
-      new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null);
-    regionA.close();
-    HRegion regionB =
-      new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null);
-    regionB.close();
+    synchronized (splitLock) {
+      Text midKey = new Text();
+      if (closed.get() || !needsSplit(midKey)) {
+        return null;
+      }
+      long startTime = System.currentTimeMillis();
+      Path splits = getSplitsDir();
+      HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
+          this.regionInfo.getStartKey(), midKey);
+      Path dirA = getSplitRegionDir(splits,
+          HRegionInfo.encodeRegionName(regionAInfo.getRegionName()));
+      if(fs.exists(dirA)) {
+        throw new IOException("Cannot split; target file collision at " + dirA);
+      }
+      HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
+          midKey, null);
+      Path dirB = getSplitRegionDir(splits,
+          HRegionInfo.encodeRegionName(regionBInfo.getRegionName()));
+      if(this.fs.exists(dirB)) {
+        throw new IOException("Cannot split; target file collision at " + dirB);
+      }
+
+      // Notify the caller that we are about to close the region. This moves
+      // us to the 'retiring' queue. Means no more updates coming in -- just
+      // whatever is outstanding.
+      if (listener != null) {
+        listener.closing(getRegionName());
+      }
+
+      // Now close the HRegion.  Close returns all store files or null if not
+      // supposed to close (? What to do in this case? Implement abort of close?)
+      // Close also does wait on outstanding rows and calls a flush just-in-case.
+      List<HStoreFile> hstoreFilesToSplit = close();
+      if (hstoreFilesToSplit == null) {
+        LOG.warn("Close came back null (Implement abort of close?)");
+        throw new RuntimeException("close returned empty vector of HStoreFiles");
+      }
+
+      // Tell listener that region is now closed and that they can therefore
+      // clean up any outstanding references.
+      if (listener != null) {
+        listener.closed(this.getRegionName());
+      }
+
+      // Split each store file.
+      for(HStoreFile h: hstoreFilesToSplit) {
+        // A reference to the bottom half of the hsf store file.
+        HStoreFile.Reference aReference = new HStoreFile.Reference(
+            this.encodedRegionName, h.getFileId(), new HStoreKey(midKey),
+            HStoreFile.Range.bottom);
+        HStoreFile a = new HStoreFile(this.conf, splits,
+            HRegionInfo.encodeRegionName(regionAInfo.getRegionName()),
+            h.getColFamily(), Math.abs(rand.nextLong()), aReference);
+        // Reference to top half of the hsf store file.
+        HStoreFile.Reference bReference = new HStoreFile.Reference(
+            this.encodedRegionName, h.getFileId(), new HStoreKey(midKey),
+            HStoreFile.Range.top);
+        HStoreFile b = new HStoreFile(this.conf, splits,
+            HRegionInfo.encodeRegionName(regionBInfo.getRegionName()),
+            h.getColFamily(), Math.abs(rand.nextLong()), bReference);
+        h.splitStoreFile(a, b, this.fs);
+      }
+
+      // Done!
+      // Opening the region copies the splits files from the splits directory
+      // under each region.
+      HRegion regionA =
+        new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null);
+      regionA.close();
+      HRegion regionB =
+        new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null);
+      regionB.close();
 
-    // Cleanup
-    boolean deleted = fs.delete(splits);    // Get rid of splits directory
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Cleaned up " + splits.toString() + " " + deleted);
+      // Cleanup
+      boolean deleted = fs.delete(splits);    // Get rid of splits directory
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cleaned up " + splits.toString() + " " + deleted);
+      }
+      HRegion regions[] = new HRegion [] {regionA, regionB};
+      LOG.info("Region split of " + this.regionInfo.getRegionName() +
+          " complete; " + "new regions: " + regions[0].getRegionName() + ", " +
+          regions[1].getRegionName() + ". Split took " +
+          StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
+      return regions;
     }
-    HRegion regions[] = new HRegion [] {regionA, regionB};
-    LOG.info("Region split of " + this.regionInfo.getRegionName() +
-        " complete; " + "new regions: " + regions[0].getRegionName() + ", " +
-        regions[1].getRegionName() + ". Split took " +
-        StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
-    return regions;
   }
   
   /*
@@ -1030,6 +1035,7 @@
    * avoid a bunch of disk activity.
    *
    * @param row
+   * @param ts
    * @return Map<columnName, byte[]> values
    * @throws IOException
    */
@@ -1282,6 +1288,7 @@
    * @param row The row to operate on
    * @param family The column family to match
    * @param timestamp Timestamp to match
+   * @throws IOException
    */
   public void deleteFamily(Text row, Text family, long timestamp)
   throws IOException{

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=604011&r1=604010&r2=604011&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Thu Dec 13 12:02:30 2007
@@ -79,9 +79,9 @@
   // of HRegionServer in isolation. We use AtomicBoolean rather than
   // plain boolean so we can pass a reference to Chore threads.  Otherwise,
   // Chore threads need to know about the hosting class.
-  protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
+  protected volatile AtomicBoolean stopRequested = new AtomicBoolean(false);
   
-  protected final AtomicBoolean quiesced = new AtomicBoolean(false);
+  protected volatile AtomicBoolean quiesced = new AtomicBoolean(false);
   
   // Go down hard.  Used if file system becomes unavailable and also in
   // debugging and unit tests.
@@ -95,13 +95,13 @@
   private final Random rand = new Random();
   
   // region name -> HRegion
-  protected final SortedMap<Text, HRegion> onlineRegions =
+  protected volatile SortedMap<Text, HRegion> onlineRegions =
     Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>());
-  protected final Map<Text, HRegion> retiringRegions =
+  protected volatile Map<Text, HRegion> retiringRegions =
     new ConcurrentHashMap<Text, HRegion>();
  
   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final List<HMsg> outboundMsgs =
+  private volatile List<HMsg> outboundMsgs =
     Collections.synchronizedList(new ArrayList<HMsg>());
 
   final int numRetries;
@@ -120,7 +120,7 @@
   private final Leases leases;
   
   // Request counter
-  private final AtomicInteger requestCount = new AtomicInteger();
+  private volatile AtomicInteger requestCount = new AtomicInteger();
   
   // A sleeper that sleeps for msgInterval.
   private final Sleeper sleeper;
@@ -296,7 +296,7 @@
       // splitting a 'normal' region, and the ROOT table needs to be
       // updated if we are splitting a META region.
       HTable t = null;
-      if (region.getRegionInfo().getTableDesc().getName().equals(META_TABLE_NAME)) {
+      if (region.getRegionInfo().isMetaTable()) {
         // We need to update the root region
         if (this.root == null) {
           this.root = new HTable(conf, ROOT_TABLE_NAME);