You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/07/18 04:26:04 UTC

svn commit: r557118 - in /lucene/hadoop/trunk/src/contrib/hbase: CHANGES.txt src/java/org/apache/hadoop/hbase/HMaster.java src/java/org/apache/hadoop/hbase/HRegionServer.java

Author: jimk
Date: Tue Jul 17 19:26:03 2007
New Revision: 557118

URL: http://svn.apache.org/viewvc?view=rev&rev=557118
Log:
HADOOP-1615 Replacing thread notification-based queue with java.util.concurrent.BlockingQueue in HMaster, HRegionServer

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=557118&r1=557117&r2=557118
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Tue Jul 17 19:26:03 2007
@@ -65,4 +65,5 @@
  41. HADOOP-1614 [hbase] HClient does not protect itself from simultaneous updates
  42. HADOOP-1468 Add HBase batch update to reduce RPC overhead
  43. HADOOP-1616 Sporadic TestTable failures
-
+ 44. HADOOP-1615 Replacing thread notification-based queue with 
+     java.util.concurrent.BlockingQueue in HMaster, HRegionServer

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=557118&r1=557117&r2=557118
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Tue Jul 17 19:26:03 2007
@@ -26,7 +26,6 @@
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -37,6 +36,9 @@
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.Vector;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -86,7 +88,7 @@
   int numRetries;
   long maxRegionOpenTime;
   
-  LinkedList<PendingOperation> msgQueue;
+  BlockingQueue<PendingOperation> msgQueue;
   
   private Leases serverLeases;
   private Server server;
@@ -636,7 +638,7 @@
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
     this.numRetries =  conf.getInt("hbase.client.retries.number", 2);
     this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
-    this.msgQueue = new LinkedList<PendingOperation>();
+    this.msgQueue = new LinkedBlockingQueue<PendingOperation>();
     this.serverLeases = new Leases(
       conf.getLong("hbase.master.lease.period", 30 * 1000), 
       conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
@@ -736,18 +738,13 @@
 
     // Main processing loop
     for (PendingOperation op = null; !closed; ) {
-      synchronized(msgQueue) {
-        while(msgQueue.size() == 0 && !closed) {
-          try {
-            msgQueue.wait(threadWakeFrequency);
-          } catch(InterruptedException iex) {
-            // continue
-          }
-        }
-        if(closed) {
-          continue;
-        }
-        op = msgQueue.removeFirst();
+      try {
+        op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        // continue
+      }
+      if (op == null || closed) {
+        continue;
       }
       try {
         if (LOG.isDebugEnabled()) {
@@ -765,8 +762,10 @@
           }
         }
         LOG.warn(ex);
-        synchronized(msgQueue) {
-          msgQueue.addLast(op);
+        try {
+          msgQueue.put(op);
+        } catch (InterruptedException e) {
+          throw new RuntimeException("Putting into msgQueue was interrupted.", e);
         }
       }
     }
@@ -874,10 +873,11 @@
     // name, then we can timeout the old one right away and register
     // the new one.
     storedInfo = serversToServerInfo.remove(s);
-    if(storedInfo != null && !closed) {
-      synchronized(msgQueue) {
-        msgQueue.addLast(new PendingServerShutdown(storedInfo));
-        msgQueue.notifyAll();
+    if (storedInfo != null && !closed) {
+      try {
+        msgQueue.put(new PendingServerShutdown(storedInfo));
+      } catch (InterruptedException e) {
+        throw new RuntimeException("Putting into msgQueue was interrupted.", e);
       }
     }
 
@@ -1064,9 +1064,10 @@
 
           // Queue up an update to note the region location.
 
-          synchronized(msgQueue) {
-            msgQueue.addLast(new PendingOpenReport(info, region));
-            msgQueue.notifyAll();
+          try {
+            msgQueue.put(new PendingOpenReport(info, region));
+          } catch (InterruptedException e) {
+            throw new RuntimeException("Putting into msgQueue was interrupted.", e);
           }
         }
         break;
@@ -1097,9 +1098,10 @@
           unassignedRegions.remove(region.regionName);
           assignAttempts.remove(region.regionName);
 
-          synchronized(msgQueue) {
-            msgQueue.addLast(new PendingCloseReport(region, reassignRegion, deleteRegion));
-            msgQueue.notifyAll();
+          try {
+            msgQueue.put(new PendingCloseReport(region, reassignRegion, deleteRegion));
+          } catch (InterruptedException e) {
+            throw new RuntimeException("Putting into msgQueue was interrupted.", e);
           }
 
           // NOTE: we cannot put the region into unassignedRegions as that
@@ -2406,9 +2408,10 @@
             HGlobals.rootRegionInfo);
         assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L);
       }
-      synchronized(msgQueue) {
-        msgQueue.addLast(new PendingServerShutdown(storedInfo));
-        msgQueue.notifyAll();
+      try {
+        msgQueue.put(new PendingServerShutdown(storedInfo));
+      } catch (InterruptedException e) {
+        throw new RuntimeException("Putting into msgQueue was interrupted.", e);
       }
     }
   }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=557118&r1=557117&r2=557118
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Tue Jul 17 19:26:03 2007
@@ -24,12 +24,14 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.Random;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.Vector;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
@@ -449,7 +451,7 @@
     this.splitOrCompactCheckerThread = new Thread(splitOrCompactChecker);
     
     // Process requests from Master
-    this.toDo = new LinkedList<ToDoEntry>();
+    this.toDo = new LinkedBlockingQueue<ToDoEntry>();
     this.worker = new Worker();
     this.workerThread = new Thread(worker);
 
@@ -661,7 +663,11 @@
                   if (LOG.isDebugEnabled()) {
                     LOG.debug("Got default message");
                   }
-                  toDo.addLast(new ToDoEntry(msgs[i]));
+                  try {
+                    toDo.put(new ToDoEntry(msgs[i]));
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+                  }
                 }
               }
               
@@ -828,7 +834,7 @@
       this.msg = msg;
     }
   }
-  LinkedList<ToDoEntry> toDo;
+  BlockingQueue<ToDoEntry> toDo;
   private Worker worker;
   private Thread workerThread;
   /** Thread that performs long running requests from the master */
@@ -844,26 +850,14 @@
      */
     public void run() {
       for(ToDoEntry e = null; !stopRequested; ) {
-        synchronized(toDo) {
-          while(toDo.size() == 0 && !stopRequested) {
-            try {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Wait on todo");
-              }
-              toDo.wait(threadWakeFrequency);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Wake on todo");
-              }
-            } catch(InterruptedException ex) {
-              // continue
-            }
-          }
-          if(stopRequested) {
-            continue;
-          }
-          e = toDo.removeFirst();
+        try {
+          e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ex) {
+          // continue
+        }
+        if(e == null || stopRequested) {
+          continue;
         }
-        
         try {
           if (LOG.isDebugEnabled()) {
             LOG.debug(e.msg.toString());
@@ -900,8 +894,10 @@
           if(e.tries < numRetries) {
             LOG.warn(ie);
             e.tries++;
-            synchronized(toDo) {
-              toDo.addLast(e);
+            try {
+              toDo.put(e);
+            } catch (InterruptedException ex) {
+              throw new RuntimeException("Putting into msgQueue was interrupted.", ex);
             }
           } else {
             LOG.error("unable to process message: " + e.msg.toString(), ie);