You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/05/23 23:30:26 UTC

svn commit: r541095 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/

Author: jimk
Date: Wed May 23 14:30:25 2007
New Revision: 541095

URL: http://svn.apache.org/viewvc?view=rev&rev=541095
Log:
HADOOP-1424. TestHBaseCluster fails with IllegalMonitorStateException. Fix regression introduced by HADOOP-1397.

Added:
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java
Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=541095&r1=541094&r2=541095
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Wed May 23 14:30:25 2007
@@ -14,3 +14,5 @@
      'Performance Evaluation', etc.
   7. HADOOP-1420, HADOOP-1423. Findbugs changes, remove reference to removed 
      class HLocking.
+  8. HADOOP-1424. TestHBaseCluster fails with IllegalMonitorStateException. Fix
+     regression introduced by HADOOP-1397.

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java?view=auto&rev=541095
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java Wed May 23 14:30:25 2007
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * HLocking is a set of lock primitives that does not rely on a
+ * particular thread holding the monitor for an object. This is
+ * especially important when a lock must persist over multiple RPC's
+ * since there is no guarantee that the same Server thread will handle
+ * all the RPC's until the lock is released.  Not requiring that the locker
+ * thread is same as unlocking thread is the key distinction between this
+ * class and {@link java.util.concurrent.locks.ReentrantReadWriteLock}. 
+ *
+ * <p>For each independent entity that needs locking, create a new HLocking
+ * instance.
+ */
+public class HLocking {
+  private Integer mutex;
+  
+  // If lockers == 0, the lock is unlocked
+  // If lockers > 0, locked for read
+  // If lockers == -1 locked for write
+  
+  private AtomicInteger lockers;
+  
+  /** Constructor */
+  public HLocking() {
+    this.mutex = new Integer(0);
+    this.lockers = new AtomicInteger(0);
+  }
+
+  /**
+   * Caller needs the nonexclusive read-lock
+   */
+  public void obtainReadLock() {
+    synchronized(mutex) {
+      while(lockers.get() < 0) {
+        try {
+          mutex.wait();
+        } catch(InterruptedException ie) {
+        }
+      }
+      lockers.incrementAndGet();
+      mutex.notifyAll();
+    }
+  }
+
+  /**
+   * Caller is finished with the nonexclusive read-lock
+   */
+  public void releaseReadLock() {
+    synchronized(mutex) {
+      if(lockers.decrementAndGet() < 0) {
+        throw new IllegalStateException("lockers: " + lockers);
+      }
+      mutex.notifyAll();
+    }
+  }
+
+  /**
+   * Caller needs the exclusive write-lock
+   */
+  public void obtainWriteLock() {
+    synchronized(mutex) {
+      while(!lockers.compareAndSet(0, -1)) {
+        try {
+          mutex.wait();
+        } catch (InterruptedException ie) {
+        }
+      }
+      mutex.notifyAll();
+    }
+  }
+
+  /**
+   * Caller is finished with the write lock
+   */
+  public void releaseWriteLock() {
+    synchronized(mutex) {
+      if(!lockers.compareAndSet(-1, 0)) {
+        throw new IllegalStateException("lockers: " + lockers);
+      }
+      mutex.notifyAll();
+    }
+  }
+}

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?view=diff&rev=541095&r1=541094&r2=541095
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Wed May 23 14:30:25 2007
@@ -15,14 +15,17 @@
  */
 package org.apache.hadoop.hbase;
 
-import org.apache.hadoop.io.*;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
 
 /*******************************************************************************
  * The HMemcache holds in-memory modifications to the HRegion.  This is really a
@@ -39,7 +42,7 @@
   
   TreeMap<HStoreKey, BytesWritable> snapshot = null;
 
-  ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final HLocking lock = new HLocking();
 
   public HMemcache() {
     super();
@@ -70,7 +73,7 @@
   public Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
     Snapshot retval = new Snapshot();
 
-    this.lock.writeLock().lock();
+    this.lock.obtainWriteLock();
     try {
       if(snapshot != null) {
         throw new IOException("Snapshot in progress!");
@@ -99,7 +102,7 @@
       return retval;
       
     } finally {
-      this.lock.writeLock().unlock();
+      this.lock.releaseWriteLock();
     }
   }
 
@@ -109,7 +112,7 @@
    * Modifying the structure means we need to obtain a writelock.
    */
   public void deleteSnapshot() throws IOException {
-    this.lock.writeLock().lock();
+    this.lock.obtainWriteLock();
 
     try {
       if(snapshot == null) {
@@ -135,7 +138,7 @@
       }
       
     } finally {
-      this.lock.writeLock().unlock();
+      this.lock.releaseWriteLock();
     }
   }
 
@@ -145,14 +148,14 @@
    * Operation uses a write lock.
    */
   public void add(Text row, TreeMap<Text, BytesWritable> columns, long timestamp) {
-    this.lock.writeLock().lock();
+    this.lock.obtainWriteLock();
     try {
       for (Map.Entry<Text, BytesWritable> es: columns.entrySet()) {
         HStoreKey key = new HStoreKey(row, es.getKey(), timestamp);
         memcache.put(key, es.getValue());
       }
     } finally {
-      this.lock.writeLock().unlock();
+      this.lock.releaseWriteLock();
     }
   }
 
@@ -163,7 +166,7 @@
    */
   public BytesWritable[] get(HStoreKey key, int numVersions) {
     Vector<BytesWritable> results = new Vector<BytesWritable>();
-    this.lock.readLock().lock();
+    this.lock.obtainReadLock();
     try {
       Vector<BytesWritable> result = get(memcache, key, numVersions-results.size());
       results.addAll(0, result);
@@ -180,7 +183,7 @@
       return (results.size() == 0)?
         null: results.toArray(new BytesWritable[results.size()]);
     } finally {
-      this.lock.readLock().unlock();
+      this.lock.releaseReadLock();
     }
   }
   
@@ -192,7 +195,7 @@
    */
   public TreeMap<Text, BytesWritable> getFull(HStoreKey key) {
     TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
-    this.lock.readLock().lock();
+    this.lock.obtainReadLock();
     try {
       internalGetFull(memcache, key, results);
       for(int i = history.size()-1; i >= 0; i--) {
@@ -202,7 +205,7 @@
       return results;
       
     } finally {
-      this.lock.readLock().unlock();
+      this.lock.releaseReadLock();
     }
   }
   
@@ -275,7 +278,7 @@
       
       super(timestamp, targetCols);
       
-      lock.readLock().lock();
+      lock.obtainReadLock();
       try {
         this.backingMaps = new TreeMap[history.size() + 1];
         
@@ -367,7 +370,7 @@
           }
           
         } finally {
-          lock.readLock().unlock();
+          lock.releaseReadLock();
           scannerClosed = true;
         }
       }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?view=diff&rev=541095&r1=541094&r2=541095
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Wed May 23 14:30:25 2007
@@ -23,7 +23,6 @@
 
 import java.io.*;
 import java.util.*;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * HRegion stores data for a certain region of a table.  It stores all columns
@@ -283,7 +282,7 @@
 
   int maxUnflushedEntries = 0;
   int compactionThreshold = 0;
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final HLocking lock = new HLocking();
 
   //////////////////////////////////////////////////////////////////////////////
   // Constructor
@@ -398,7 +397,7 @@
    * time-sensitive thread.
    */
   public Vector<HStoreFile> close() throws IOException {
-    lock.writeLock().lock();
+    lock.obtainWriteLock();
     try {
       boolean shouldClose = false;
       synchronized(writestate) {
@@ -438,7 +437,7 @@
         }
       }
     } finally {
-      lock.writeLock().unlock();
+      lock.releaseWriteLock();
     }
   }
 
@@ -614,7 +613,7 @@
    * @return            - true if the region should be split
    */
   public boolean needsSplit(Text midKey) {
-    lock.readLock().lock();
+    lock.obtainReadLock();
 
     try {
       Text key = new Text();
@@ -632,7 +631,7 @@
       return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2)));
       
     } finally {
-      lock.readLock().unlock();
+      lock.releaseReadLock();
     }
   }
 
@@ -641,7 +640,7 @@
    */
   public boolean needsCompaction() {
     boolean needsCompaction = false;
-    lock.readLock().lock();
+    lock.obtainReadLock();
     try {
       for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
         if(i.next().getNMaps() > compactionThreshold) {
@@ -650,7 +649,7 @@
         }
       }
     } finally {
-      lock.readLock().unlock();
+      lock.releaseReadLock();
     }
     return needsCompaction;
   }
@@ -670,7 +669,7 @@
    */
   public boolean compactStores() throws IOException {
     boolean shouldCompact = false;
-    lock.readLock().lock();
+    lock.obtainReadLock();
     try {
       synchronized(writestate) {
         if((! writestate.writesOngoing)
@@ -683,32 +682,30 @@
         }
       }
     } finally {
-      lock.readLock().unlock();
+      lock.releaseReadLock();
     }
 
     if(! shouldCompact) {
       LOG.info("not compacting region " + this.regionInfo.regionName);
-      return false;
-      
-    } else {
-      lock.writeLock().lock();
-      try {
-        LOG.info("starting compaction on region " + this.regionInfo.regionName);
-        for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
-          HStore store = it.next();
-          store.compact();
-        }
-        LOG.info("compaction completed on region " + this.regionInfo.regionName);
-        return true;
-        
-      } finally {
-        synchronized(writestate) {
-          writestate.writesOngoing = false;
-          recentCommits = 0;
-          writestate.notifyAll();
-        }
-        lock.writeLock().unlock();
+      return false; 
+    }
+    lock.obtainWriteLock();
+    try {
+      LOG.info("starting compaction on region " + this.regionInfo.regionName);
+      for (Iterator<HStore> it = stores.values().iterator(); it.hasNext();) {
+        HStore store = it.next();
+        store.compact();
+      }
+      LOG.info("compaction completed on region " + this.regionInfo.regionName);
+      return true;
+
+    } finally {
+      synchronized (writestate) {
+        writestate.writesOngoing = false;
+        recentCommits = 0;
+        writestate.notifyAll();
       }
+      lock.releaseWriteLock();
     }
   }
 
@@ -928,7 +925,7 @@
 
   private BytesWritable[] get(HStoreKey key, int numVersions) throws IOException {
 
-    lock.readLock().lock();
+    lock.obtainReadLock();
     try {
       // Check the memcache
 
@@ -948,7 +945,7 @@
       return targetStore.get(key, numVersions);
       
     } finally {
-      lock.readLock().unlock();
+      lock.releaseReadLock();
     }
   }
 
@@ -965,7 +962,7 @@
   public TreeMap<Text, BytesWritable> getFull(Text row) throws IOException {
     HStoreKey key = new HStoreKey(row, System.currentTimeMillis());
 
-    lock.readLock().lock();
+    lock.obtainReadLock();
     try {
       TreeMap<Text, BytesWritable> memResult = memcache.getFull(key);
       for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) {
@@ -976,7 +973,7 @@
       return memResult;
       
     } finally {
-      lock.readLock().unlock();
+      lock.releaseReadLock();
     }
   }
 
@@ -985,7 +982,7 @@
    * columns.  This Iterator must be closed by the caller.
    */
   public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) throws IOException {
-    lock.readLock().lock();
+    lock.obtainReadLock();
     try {
       TreeSet<Text> families = new TreeSet<Text>();
       for(int i = 0; i < cols.length; i++) {
@@ -1001,7 +998,7 @@
       return new HScanner(cols, firstRow, memcache, storelist);
       
     } finally {
-      lock.readLock().unlock();
+      lock.releaseReadLock();
     }
   }
 
@@ -1024,11 +1021,11 @@
     // We obtain a per-row lock, so other clients will
     // block while one client performs an update.
 
-    lock.readLock().lock();
+    lock.obtainReadLock();
     try {
       return obtainLock(row);
     } finally {
-      lock.readLock().unlock();
+      lock.releaseReadLock();
     }
   }
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?view=diff&rev=541095&r1=541094&r2=541095
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Wed May 23 14:30:25 2007
@@ -23,7 +23,6 @@
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.Vector;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -64,7 +63,7 @@
   Integer compactLock = 0;
   Integer flushLock = 0;
 
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final HLocking lock = new HLocking();
 
   TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>();
   TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>();
@@ -237,7 +236,7 @@
   /** Turn off all the MapFile readers */
   public void close() throws IOException {
     LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily);
-    this.lock.writeLock().lock();
+    this.lock.obtainWriteLock();
     try {
       for (MapFile.Reader map: maps.values()) {
         map.close();
@@ -247,7 +246,7 @@
       
       LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily);
     } finally {
-      this.lock.writeLock().unlock();
+      this.lock.releaseWriteLock();
     }
   }
 
@@ -319,7 +318,7 @@
       // C. Finally, make the new MapFile available.
 
       if(addToAvailableMaps) {
-        this.lock.writeLock().lock();
+        this.lock.obtainWriteLock();
         
         try {
           maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf));
@@ -330,7 +329,7 @@
           }
         
         } finally {
-          this.lock.writeLock().unlock();
+          this.lock.releaseWriteLock();
         }
       }
       return getAllMapFiles();
@@ -338,12 +337,12 @@
   }
 
   public Vector<HStoreFile> getAllMapFiles() {
-    this.lock.readLock().lock();
+    this.lock.obtainReadLock();
     try {
       return new Vector<HStoreFile>(mapFiles.values());
       
     } finally {
-      this.lock.readLock().unlock();
+      this.lock.releaseReadLock();
     }
   }
 
@@ -385,12 +384,12 @@
         // Grab a list of files to compact.
         
         Vector<HStoreFile> toCompactFiles = null;
-        this.lock.writeLock().lock();
+        this.lock.obtainWriteLock();
         try {
           toCompactFiles = new Vector<HStoreFile>(mapFiles.values());
           
         } finally {
-          this.lock.writeLock().unlock();
+          this.lock.releaseWriteLock();
         }
 
         // Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps
@@ -627,7 +626,7 @@
 
 
     Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily);
-    this.lock.writeLock().lock();
+    this.lock.obtainWriteLock();
     try {
       Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
       if(! fs.exists(doneFile)) {
@@ -744,7 +743,7 @@
       
       // 7. Releasing the write-lock
       
-      this.lock.writeLock().unlock();
+      this.lock.releaseWriteLock();
     }
   }
 
@@ -760,7 +759,7 @@
    * The returned object should map column names to byte arrays (byte[]).
    */
   public void getFull(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException {
-    this.lock.readLock().lock();
+    this.lock.obtainReadLock();
     try {
       MapFile.Reader[] maparray 
         = maps.values().toArray(new MapFile.Reader[maps.size()]);
@@ -789,7 +788,7 @@
       }
       
     } finally {
-      this.lock.readLock().unlock();
+      this.lock.releaseReadLock();
     }
   }
 
@@ -805,7 +804,7 @@
     }
     
     Vector<BytesWritable> results = new Vector<BytesWritable>();
-    this.lock.readLock().lock();
+    this.lock.obtainReadLock();
     try {
       MapFile.Reader[] maparray 
         = maps.values().toArray(new MapFile.Reader[maps.size()]);
@@ -846,7 +845,7 @@
       }
       
     } finally {
-      this.lock.readLock().unlock();
+      this.lock.releaseReadLock();
     }
   }
   
@@ -862,7 +861,7 @@
       return maxSize;
     }
     
-    this.lock.readLock().lock();
+    this.lock.obtainReadLock();
     try {
       long mapIndex = 0L;
 
@@ -889,7 +888,7 @@
       LOG.warn(e);
 
     } finally {
-      this.lock.readLock().unlock();
+      this.lock.releaseReadLock();
     }
     return maxSize;
   }
@@ -898,12 +897,12 @@
    * @return    Returns the number of map files currently in use
    */
   public int getNMaps() {
-    this.lock.readLock().lock();
+    this.lock.obtainReadLock();
     try {
       return maps.size();
       
     } finally {
-      this.lock.readLock().unlock();
+      this.lock.releaseReadLock();
     }
   }
   
@@ -945,7 +944,7 @@
       
       super(timestamp, targetCols);
 
-      lock.readLock().lock();
+      lock.obtainReadLock();
       try {
         this.readers = new MapFile.Reader[mapFiles.size()];
         
@@ -1060,7 +1059,7 @@
           }
           
         } finally {
-          lock.readLock().unlock();
+          lock.releaseReadLock();
           scannerClosed = true;
         }
       }