You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2008/02/25 20:50:03 UTC

svn commit: r630968 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/regionserver/

Author: bryanduxbury
Date: Mon Feb 25 11:50:02 2008
New Revision: 630968

URL: http://svn.apache.org/viewvc?rev=630968&view=rev
Log:
HBASE-442 Move internal classes out of HRegionServer

Added:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=630968&r1=630967&r2=630968&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Feb 25 11:50:02 2008
@@ -65,7 +65,8 @@
    HBASE-457   Factor Master into Master, RegionManager, and ServerManager
    HBASE-464   HBASE-419 introduced javadoc errors
    HBASE-468   Move HStoreKey back to o.a.h.h
-
+   HBASE-442   Move internal classes out of HRegionServer
+   
 Branch 0.1
 
   INCOMPATIBLE CHANGES

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=630968&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Mon Feb 25 11:50:02 2008
@@ -0,0 +1,201 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.util.Writables;
+
+/** 
+ * Compact region on request and then run split if appropriate
+ */
+class CompactSplitThread extends Thread 
+implements RegionUnavailableListener, HConstants {
+  static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
+    
+  private HTable root = null;
+  private HTable meta = null;
+  private long startTime;
+  private final long frequency;
+  
+  private HRegionServer server;
+  private HBaseConfiguration conf;
+  
+  private final BlockingQueue<QueueEntry> compactionQueue =
+    new LinkedBlockingQueue<QueueEntry>();
+
+  /** constructor */
+  public CompactSplitThread(HRegionServer server) {
+    super();
+    this.server = server;
+    this.conf = server.conf;
+    this.frequency =
+      conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
+      20 * 1000);
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public void run() {
+    while (!server.isStopRequested()) {
+      QueueEntry e = null;
+      try {
+        e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
+        if (e == null) {
+          continue;
+        }
+        e.getRegion().compactIfNeeded();
+        split(e.getRegion());
+      } catch (InterruptedException ex) {
+        continue;
+      } catch (IOException ex) {
+        LOG.error("Compaction failed" +
+            (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+            RemoteExceptionHandler.checkIOException(ex));
+        if (!server.checkFileSystem()) {
+          break;
+        }
+
+      } catch (Exception ex) {
+        LOG.error("Compaction failed" +
+            (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+            ex);
+        if (!server.checkFileSystem()) {
+          break;
+        }
+      }
+    }
+    LOG.info(getName() + " exiting");
+  }
+  
+  /**
+   * @param e QueueEntry for region to be compacted
+   */
+  public void compactionRequested(QueueEntry e) {
+    compactionQueue.add(e);
+  }
+  
+  void compactionRequested(final HRegion r) {
+    compactionRequested(new QueueEntry(r, System.currentTimeMillis()));
+  }
+  
+  private void split(final HRegion region) throws IOException {
+    final HRegionInfo oldRegionInfo = region.getRegionInfo();
+    final HRegion[] newRegions = region.splitRegion(this);
+    if (newRegions == null) {
+      // Didn't need to be split
+      return;
+    }
+    
+    // When a region is split, the META table needs to updated if we're
+    // splitting a 'normal' region, and the ROOT table needs to be
+    // updated if we are splitting a META region.
+    HTable t = null;
+    if (region.getRegionInfo().isMetaTable()) {
+      // We need to update the root region
+      if (this.root == null) {
+        this.root = new HTable(conf, ROOT_TABLE_NAME);
+      }
+      t = root;
+    } else {
+      // For normal regions we need to update the meta region
+      if (meta == null) {
+        meta = new HTable(conf, META_TABLE_NAME);
+      }
+      t = meta;
+    }
+    LOG.info("Updating " + t.getTableName() + " with region split info");
+
+    // Mark old region as offline and split in META.
+    // NOTE: there is no need for retry logic here. HTable does it for us.
+    oldRegionInfo.setOffline(true);
+    oldRegionInfo.setSplit(true);
+    BatchUpdate update = new BatchUpdate(oldRegionInfo.getRegionName());
+    update.put(COL_REGIONINFO, Writables.getBytes(oldRegionInfo));
+    update.put(COL_SPLITA, Writables.getBytes(newRegions[0].getRegionInfo()));
+    update.put(COL_SPLITB, Writables.getBytes(newRegions[1].getRegionInfo()));
+    t.commit(update);
+    
+    // Add new regions to META
+    for (int i = 0; i < newRegions.length; i++) {
+      update = new BatchUpdate(newRegions[i].getRegionName());
+      update.put(COL_REGIONINFO, Writables.getBytes(
+        newRegions[i].getRegionInfo()));
+      t.commit(update);
+    }
+        
+    // Now tell the master about the new regions
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reporting region split to master");
+    }
+    server.reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
+      newRegions[1].getRegionInfo());
+    LOG.info("region split, META updated, and report to master all" +
+      " successful. Old region=" + oldRegionInfo.toString() +
+      ", new regions: " + newRegions[0].toString() + ", " +
+      newRegions[1].toString() + ". Split took " +
+      StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
+    
+    // Do not serve the new regions. Let the Master assign them.
+  }
+  
+  /** {@inheritDoc} */
+  public void closing(final Text regionName) {
+    startTime = System.currentTimeMillis();
+    server.getWriteLock().lock();
+    try {
+      // Remove region from regions Map and add it to the Map of retiring
+      // regions.
+      server.setRegionClosing(regionName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(regionName.toString() + " closing (" +
+          "Adding to retiringRegions)");
+      }
+    } finally {
+      server.getWriteLock().unlock();
+    }
+  }
+  
+  /** {@inheritDoc} */
+  public void closed(final Text regionName) {
+    server.getWriteLock().lock();
+    try {
+      server.setRegionClosed(regionName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(regionName.toString() + " closed");
+      }
+    } finally {
+      server.getWriteLock().unlock();
+    }
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java?rev=630968&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java Mon Feb 25 11:50:02 2008
@@ -0,0 +1,148 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.ConcurrentModificationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+
+/** Flush cache upon request */
+class Flusher extends Thread implements CacheFlushListener {
+  static final Log LOG = LogFactory.getLog(Flusher.class);
+  private final DelayQueue<QueueEntry> flushQueue =
+    new DelayQueue<QueueEntry>();
+
+  private final long optionalFlushPeriod;
+  private final HRegionServer server;
+  private final HBaseConfiguration conf;
+  private final Integer lock = new Integer(0);
+  
+  /** constructor */
+  public Flusher(final HRegionServer server) {
+    super();
+    this.server = server;
+    conf = server.conf;
+    this.optionalFlushPeriod = conf.getLong(
+      "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public void run() {
+    while (!server.isStopRequested()) {
+      QueueEntry e = null;
+      try {
+        e = flushQueue.poll(server.threadWakeFrequency, TimeUnit.MILLISECONDS);
+        if (e == null) {
+          continue;
+        }
+        synchronized(lock) { // Don't interrupt while we're working
+          if (e.getRegion().flushcache()) {
+            server.compactionRequested(e);
+          }
+            
+          e.setExpirationTime(System.currentTimeMillis() +
+              optionalFlushPeriod);
+          flushQueue.add(e);
+        }
+        
+        // Now ensure that all the active regions are in the queue
+        Set<HRegion> regions = server.getRegionsToCheck();
+        for (HRegion r: regions) {
+          e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod);
+          synchronized (flushQueue) {
+            if (!flushQueue.contains(e)) {
+              flushQueue.add(e);
+            }
+          }
+        }
+
+        // Now make sure that the queue only contains active regions
+        synchronized (flushQueue) {
+          for (Iterator<QueueEntry> i = flushQueue.iterator(); i.hasNext();  ) {
+            e = i.next();
+            if (!regions.contains(e.getRegion())) {
+              i.remove();
+            }
+          }
+        }
+      } catch (InterruptedException ex) {
+        continue;
+      } catch (ConcurrentModificationException ex) {
+        continue;
+      } catch (DroppedSnapshotException ex) {
+        // Cache flush can fail in a few places.  If it fails in a critical
+        // section, we get a DroppedSnapshotException and a replay of hlog
+        // is required. Currently the only way to do this is a restart of
+        // the server.
+        LOG.fatal("Replay of hlog required. Forcing server restart", ex);
+        if (!server.checkFileSystem()) {
+          break;
+        }
+        server.stop();
+      } catch (IOException ex) {
+        LOG.error("Cache flush failed" +
+          (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+          RemoteExceptionHandler.checkIOException(ex));
+        if (!server.checkFileSystem()) {
+          break;
+        }
+      } catch (Exception ex) {
+        LOG.error("Cache flush failed" +
+          (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+          ex);
+        if (!server.checkFileSystem()) {
+          break;
+        }
+      }
+    }
+    flushQueue.clear();
+    LOG.info(getName() + " exiting");
+  }
+  
+  /** {@inheritDoc} */
+  public void flushRequested(HRegion region) {
+    QueueEntry e = new QueueEntry(region, System.currentTimeMillis());
+    synchronized (flushQueue) {
+      if (flushQueue.contains(e)) {
+        flushQueue.remove(e);
+      }
+      flushQueue.add(e);
+    }
+  }
+  
+  /**
+   * Only interrupt once it's done with a run through the work loop.
+   */ 
+  void interruptPolitely() {
+    synchronized (lock) {
+      interrupt();
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=630968&r1=630967&r2=630968&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Feb 25 11:50:02 2008
@@ -185,415 +185,18 @@
     
   }
 
-  /** Queue entry passed to flusher, compactor and splitter threads */
-  class QueueEntry implements Delayed {
-    private final HRegion region;
-    private long expirationTime;
-
-    QueueEntry(HRegion region, long expirationTime) {
-      this.region = region;
-      this.expirationTime = expirationTime;
-    }
-    
-    /** {@inheritDoc} */
-    @Override
-    public boolean equals(Object o) {
-      QueueEntry other = (QueueEntry) o;
-      return this.hashCode() == other.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int hashCode() {
-      return this.region.getRegionInfo().hashCode();
-    }
-
-    /** {@inheritDoc} */
-    public long getDelay(TimeUnit unit) {
-      return unit.convert(this.expirationTime - System.currentTimeMillis(),
-          TimeUnit.MILLISECONDS);
-    }
-
-    /** {@inheritDoc} */
-    public int compareTo(Delayed o) {
-      long delta = this.getDelay(TimeUnit.MILLISECONDS) -
-        o.getDelay(TimeUnit.MILLISECONDS);
-
-      int value = 0;
-      if (delta > 0) {
-        value = 1;
-        
-      } else if (delta < 0) {
-        value = -1;
-      }
-      return value;
-    }
-
-    /** @return the region */
-    public HRegion getRegion() {
-      return region;
-    }
-
-    /** @param expirationTime the expirationTime to set */
-    public void setExpirationTime(long expirationTime) {
-      this.expirationTime = expirationTime;
-    }
-  }
-
   // Compactions
   final CompactSplitThread compactSplitThread;
-  // Needed during shutdown so we send an interrupt after completion of a
-  // compaction, not in the midst.
-  final Integer compactSplitLock = new Integer(0);
-
-  /** Compact region on request and then run split if appropriate
-   */
-  private class CompactSplitThread extends Thread
-  implements RegionUnavailableListener {
-    private HTable root = null;
-    private HTable meta = null;
-    private long startTime;
-    private final long frequency;
-    
-    private final BlockingQueue<QueueEntry> compactionQueue =
-      new LinkedBlockingQueue<QueueEntry>();
-
-    /** constructor */
-    public CompactSplitThread() {
-      super();
-      this.frequency =
-        conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
-        20 * 1000);
-    }
-    
-    /** {@inheritDoc} */
-    @Override
-    public void run() {
-      while (!stopRequested.get()) {
-        QueueEntry e = null;
-        try {
-          e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
-          if (e == null) {
-            continue;
-          }
-          e.getRegion().compactIfNeeded();
-          split(e.getRegion());
-        } catch (InterruptedException ex) {
-          continue;
-        } catch (IOException ex) {
-          LOG.error("Compaction failed" +
-              (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
-              RemoteExceptionHandler.checkIOException(ex));
-          if (!checkFileSystem()) {
-            break;
-          }
-
-        } catch (Exception ex) {
-          LOG.error("Compaction failed" +
-              (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
-              ex);
-          if (!checkFileSystem()) {
-            break;
-          }
-        }
-      }
-      LOG.info(getName() + " exiting");
-    }
-    
-    /**
-     * @param e QueueEntry for region to be compacted
-     */
-    public void compactionRequested(QueueEntry e) {
-      compactionQueue.add(e);
-    }
-    
-    void compactionRequested(final HRegion r) {
-      compactionRequested(new QueueEntry(r, System.currentTimeMillis()));
-    }
-    
-    private void split(final HRegion region) throws IOException {
-      final HRegionInfo oldRegionInfo = region.getRegionInfo();
-      final HRegion[] newRegions = region.splitRegion(this);
-      if (newRegions == null) {
-        // Didn't need to be split
-        return;
-      }
-      
-      // When a region is split, the META table needs to updated if we're
-      // splitting a 'normal' region, and the ROOT table needs to be
-      // updated if we are splitting a META region.
-      HTable t = null;
-      if (region.getRegionInfo().isMetaTable()) {
-        // We need to update the root region
-        if (this.root == null) {
-          this.root = new HTable(conf, ROOT_TABLE_NAME);
-        }
-        t = root;
-      } else {
-        // For normal regions we need to update the meta region
-        if (meta == null) {
-          meta = new HTable(conf, META_TABLE_NAME);
-        }
-        t = meta;
-      }
-      LOG.info("Updating " + t.getTableName() + " with region split info");
 
-      // Mark old region as offline and split in META.
-      // NOTE: there is no need for retry logic here. HTable does it for us.
-      oldRegionInfo.setOffline(true);
-      oldRegionInfo.setSplit(true);
-      BatchUpdate update = new BatchUpdate(oldRegionInfo.getRegionName());
-      update.put(COL_REGIONINFO, Writables.getBytes(oldRegionInfo));
-      update.put(COL_SPLITA, Writables.getBytes(newRegions[0].getRegionInfo()));
-      update.put(COL_SPLITB, Writables.getBytes(newRegions[1].getRegionInfo()));
-      t.commit(update);
-      
-      // Add new regions to META
-      for (int i = 0; i < newRegions.length; i++) {
-        update = new BatchUpdate(newRegions[i].getRegionName());
-        update.put(COL_REGIONINFO, Writables.getBytes(
-          newRegions[i].getRegionInfo()));
-        t.commit(update);
-      }
-          
-      // Now tell the master about the new regions
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Reporting region split to master");
-      }
-      reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
-        newRegions[1].getRegionInfo());
-      LOG.info("region split, META updated, and report to master all" +
-        " successful. Old region=" + oldRegionInfo.toString() +
-        ", new regions: " + newRegions[0].toString() + ", " +
-        newRegions[1].toString() + ". Split took " +
-        StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
-      
-      // Do not serve the new regions. Let the Master assign them.
-    }
-    
-    /** {@inheritDoc} */
-    public void closing(final Text regionName) {
-      startTime = System.currentTimeMillis();
-      lock.writeLock().lock();
-      try {
-        // Remove region from regions Map and add it to the Map of retiring
-        // regions.
-        retiringRegions.put(regionName, onlineRegions.remove(regionName));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(regionName.toString() + " closing (" +
-            "Adding to retiringRegions)");
-        }
-      } finally {
-        lock.writeLock().unlock();
-      }
-    }
-    
-    /** {@inheritDoc} */
-    public void closed(final Text regionName) {
-      lock.writeLock().lock();
-      try {
-        retiringRegions.remove(regionName);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(regionName.toString() + " closed");
-        }
-      } finally {
-        lock.writeLock().unlock();
-      }
-    }
-  }
-  
   // Cache flushing  
   final Flusher cacheFlusher;
-  // Needed during shutdown so we send an interrupt after completion of a
-  // flush, not in the midst.
-  final Integer cacheFlusherLock = new Integer(0);
   
-  /** Flush cache upon request */
-  class Flusher extends Thread implements CacheFlushListener {
-    private final DelayQueue<QueueEntry> flushQueue =
-      new DelayQueue<QueueEntry>();
-
-    private final long optionalFlushPeriod;
-    
-    /** constructor */
-    public Flusher() {
-      super();
-      this.optionalFlushPeriod = conf.getLong(
-        "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
-
-    }
-    
-    /** {@inheritDoc} */
-    @Override
-    public void run() {
-      while (!stopRequested.get()) {
-        QueueEntry e = null;
-        try {
-          e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-          if (e == null) {
-            continue;
-          }
-          synchronized(cacheFlusherLock) { // Don't interrupt while we're working
-            if (e.getRegion().flushcache()) {
-              compactSplitThread.compactionRequested(e);
-            }
-              
-            e.setExpirationTime(System.currentTimeMillis() +
-                optionalFlushPeriod);
-            flushQueue.add(e);
-          }
-          
-          // Now insure that all the active regions are in the queue
-          
-          Set<HRegion> regions = getRegionsToCheck();
-          for (HRegion r: regions) {
-            e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod);
-            synchronized (flushQueue) {
-              if (!flushQueue.contains(e)) {
-                flushQueue.add(e);
-              }
-            }
-          }
-
-          // Now make sure that the queue only contains active regions
-
-          synchronized (flushQueue) {
-            for (Iterator<QueueEntry> i = flushQueue.iterator(); i.hasNext();  ) {
-              e = i.next();
-              if (!regions.contains(e.getRegion())) {
-                i.remove();
-              }
-            }
-          }
-        } catch (InterruptedException ex) {
-          continue;
-
-        } catch (ConcurrentModificationException ex) {
-          continue;
-
-        } catch (DroppedSnapshotException ex) {
-          // Cache flush can fail in a few places.  If it fails in a critical
-          // section, we get a DroppedSnapshotException and a replay of hlog
-          // is required. Currently the only way to do this is a restart of
-          // the server.
-          LOG.fatal("Replay of hlog required. Forcing server restart", ex);
-          if (!checkFileSystem()) {
-            break;
-          }
-          HRegionServer.this.stop();
-
-        } catch (IOException ex) {
-          LOG.error("Cache flush failed" +
-              (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
-              RemoteExceptionHandler.checkIOException(ex));
-          if (!checkFileSystem()) {
-            break;
-          }
-
-        } catch (Exception ex) {
-          LOG.error("Cache flush failed" +
-              (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
-              ex);
-          if (!checkFileSystem()) {
-            break;
-          }
-        }
-      }
-      flushQueue.clear();
-      LOG.info(getName() + " exiting");
-    }
-    
-    /** {@inheritDoc} */
-    public void flushRequested(HRegion region) {
-      QueueEntry e = new QueueEntry(region, System.currentTimeMillis());
-      synchronized (flushQueue) {
-        if (flushQueue.contains(e)) {
-          flushQueue.remove(e);
-        }
-        flushQueue.add(e);
-      }
-    }
-  }
-
   // HLog and HLog roller.  log is protected rather than private to avoid
   // eclipse warning when accessed by inner classes
   protected HLog log;
   final LogRoller logRoller;
   final Integer logRollerLock = new Integer(0);
   
-  /** Runs periodically to determine if the HLog should be rolled */
-  class LogRoller extends Thread implements LogRollListener {
-    private final Integer rollLock = new Integer(0);
-    private final long optionalLogRollInterval;
-    private long lastLogRollTime;
-    private volatile boolean rollLog;
-    
-    /** constructor */
-    public LogRoller() {
-      super();
-      this.optionalLogRollInterval = conf.getLong(
-          "hbase.regionserver.optionallogrollinterval", 30L * 60L * 1000L);
-      this.rollLog = false;
-      lastLogRollTime = System.currentTimeMillis();
-    }
- 
-    /** {@inheritDoc} */
-    @Override
-    public void run() {
-      while (!stopRequested.get()) {
-        while (!rollLog && !stopRequested.get()) {
-          long now = System.currentTimeMillis();
-          if (this.lastLogRollTime + this.optionalLogRollInterval <= now) {
-            rollLog = true;
-            this.lastLogRollTime = now;
-          } else {
-            synchronized (rollLock) {
-              try {
-                rollLock.wait(threadWakeFrequency);
-
-              } catch (InterruptedException e) {
-                continue;
-              }
-            }
-          }
-        }
-        if (!rollLog) {
-          // There's only two reasons to break out of the while loop.
-          // 1. Log roll requested
-          // 2. Stop requested
-          // so if a log roll was not requested, continue and break out of loop
-          continue;
-        }
-        synchronized (logRollerLock) {
-          try {
-            LOG.info("Rolling hlog. Number of entries: " + log.getNumEntries());
-            log.rollWriter();
-            
-          } catch (IOException ex) {
-            LOG.error("Log rolling failed",
-              RemoteExceptionHandler.checkIOException(ex));
-            checkFileSystem();
-            
-          } catch (Exception ex) {
-            LOG.error("Log rolling failed", ex);
-            checkFileSystem();
-            
-          } finally {
-            rollLog = false;
-          }
-        }
-      }
-    }
-
-    /** {@inheritDoc} */
-    public void logRollRequested() {
-      synchronized (rollLock) {
-        rollLog = true;
-        rollLock.notifyAll();
-      }
-    }
-  }
-
   /**
    * Starts a HRegionServer at the default location
    * @param conf
@@ -624,13 +227,13 @@
       conf.getInt("hbase.master.lease.period", 30 * 1000);
 
     // Cache flushing thread.
-    this.cacheFlusher = new Flusher();
+    this.cacheFlusher = new Flusher(this);
     
     // Compaction thread
-    this.compactSplitThread = new CompactSplitThread();
+    this.compactSplitThread = new CompactSplitThread(this);
     
     // Log rolling thread
-    this.logRoller = new LogRoller();
+    this.logRoller = new LogRoller(this);
 
     // Task thread to process requests from Master
     this.worker = new Worker();
@@ -817,12 +420,8 @@
 
     // Send interrupts to wake up threads if sleeping so they notice shutdown.
     // TODO: Should we check they are alive?  If OOME could have exited already
-    synchronized(cacheFlusherLock) {
-      this.cacheFlusher.interrupt();
-    }
-    synchronized (compactSplitLock) {
-      this.compactSplitThread.interrupt();
-    }
+    cacheFlusher.interruptPolitely();
+    compactSplitThread.interrupt();
     synchronized (logRollerLock) {
       this.logRoller.interrupt();
     }
@@ -1592,9 +1191,28 @@
   }
 
   /** @return the info server */
+  /**
+   * Get the InfoServer this HRegionServer has put up.
+   */
   public InfoServer getInfoServer() {
     return infoServer;
   }
+  
+  /**
+   * Check if a stop has been requested.
+   */
+  public boolean isStopRequested() {
+    return stopRequested.get();
+  }
+
+  /** Get the write lock for the server */
+  ReentrantReadWriteLock.WriteLock getWriteLock() {
+    return lock.writeLock();
+  }
+
+  void compactionRequested(QueueEntry e) {
+    compactSplitThread.compactionRequested(e);
+  }
 
   /**
    * @return Immutable list of this servers regions.
@@ -1624,6 +1242,16 @@
     return getRegion(regionName, false);
   }
   
+  /** Move a region from online to closing. */
+  void setRegionClosing(final Text regionName) {
+    retiringRegions.put(regionName, onlineRegions.remove(regionName));
+  }
+  
+  /** Set a region as closed. */
+  void setRegionClosed(final Text regionName) {
+    retiringRegions.remove(regionName);
+  }
+  
   /** 
    * Protected utility method for safely obtaining an HRegion handle.
    * @param regionName Name of online {@link HRegion} to return
@@ -1633,7 +1261,7 @@
    * @throws NotServingRegionException
    */
   protected HRegion getRegion(final Text regionName,
-      final boolean checkRetiringRegions)
+    final boolean checkRetiringRegions)
   throws NotServingRegionException {
     HRegion region = null;
     this.lock.readLock().lock();

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=630968&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Mon Feb 25 11:50:02 2008
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+
+/** Runs periodically to determine if the HLog should be rolled */
+class LogRoller extends Thread implements LogRollListener {
+  static final Log LOG = LogFactory.getLog(LogRoller.class);  
+  private final Integer rollLock = new Integer(0);
+  private final long optionalLogRollInterval;
+  private long lastLogRollTime;
+  private volatile boolean rollLog;
+  private final HRegionServer server;
+  private final HBaseConfiguration conf;
+  
+  /** constructor */
+  public LogRoller(final HRegionServer server) {
+    super();
+    this.server = server;
+    conf = server.conf;
+    this.optionalLogRollInterval = conf.getLong(
+      "hbase.regionserver.optionallogrollinterval", 30L * 60L * 1000L);
+    this.rollLog = false;
+    lastLogRollTime = System.currentTimeMillis();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void run() {
+    while (!server.isStopRequested()) {
+      while (!rollLog && !server.isStopRequested()) {
+        long now = System.currentTimeMillis();
+        if (this.lastLogRollTime + this.optionalLogRollInterval <= now) {
+          rollLog = true;
+          this.lastLogRollTime = now;
+        } else {
+          synchronized (rollLock) {
+            try {
+              rollLock.wait(server.threadWakeFrequency);
+            } catch (InterruptedException e) {
+              continue;
+            }
+          }
+        }
+      }
+      if (!rollLog) {
+        // There's only two reasons to break out of the while loop.
+        // 1. Log roll requested
+        // 2. Stop requested
+        // so if a log roll was not requested, continue and break out of loop
+        continue;
+      }
+      synchronized (server.logRollerLock) {
+        try {
+          LOG.info("Rolling hlog. Number of entries: " + server.getLog().getNumEntries());
+          server.getLog().rollWriter();
+        } catch (IOException ex) {
+          LOG.error("Log rolling failed",
+            RemoteExceptionHandler.checkIOException(ex));
+          server.checkFileSystem();
+        } catch (Exception ex) {
+          LOG.error("Log rolling failed", ex);
+          server.checkFileSystem();
+        } finally {
+          rollLog = false;
+        }
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  public void logRollRequested() {
+    synchronized (rollLock) {
+      rollLog = true;
+      rollLock.notifyAll();
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java?rev=630968&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java Mon Feb 25 11:50:02 2008
@@ -0,0 +1,78 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Delayed;
+
+/** Queue entry passed to flusher, compactor and splitter threads */
+class QueueEntry implements Delayed {
+  private final HRegion region;
+  private long expirationTime;
+
+  QueueEntry(HRegion region, long expirationTime) {
+    this.region = region;
+    this.expirationTime = expirationTime;
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public boolean equals(Object o) {
+    QueueEntry other = (QueueEntry) o;
+    return this.hashCode() == other.hashCode();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int hashCode() {
+    return this.region.getRegionInfo().hashCode();
+  }
+
+  /** {@inheritDoc} */
+  public long getDelay(TimeUnit unit) {
+    return unit.convert(this.expirationTime - System.currentTimeMillis(),
+        TimeUnit.MILLISECONDS);
+  }
+
+  /** {@inheritDoc} */
+  public int compareTo(Delayed o) {
+    long delta = this.getDelay(TimeUnit.MILLISECONDS) -
+      o.getDelay(TimeUnit.MILLISECONDS);
+
+    int value = 0;
+    if (delta > 0) {
+      value = 1;
+      
+    } else if (delta < 0) {
+      value = -1;
+    }
+    return value;
+  }
+
+  /** @return the region */
+  public HRegion getRegion() {
+    return region;
+  }
+
+  /** @param expirationTime the expirationTime to set */
+  public void setExpirationTime(long expirationTime) {
+    this.expirationTime = expirationTime;
+  }
+}
\ No newline at end of file