You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/05/23 23:30:26 UTC
svn commit: r541095 - in /lucene/hadoop/trunk/src/contrib/hbase: ./
src/java/org/apache/hadoop/hbase/
Author: jimk
Date: Wed May 23 14:30:25 2007
New Revision: 541095
URL: http://svn.apache.org/viewvc?view=rev&rev=541095
Log:
HADOOP-1424. TestHBaseCluster fails with IllegalMonitorStateException. Fix regression introduced by HADOOP-1397.
Added:
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java
Modified:
lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=541095&r1=541094&r2=541095
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Wed May 23 14:30:25 2007
@@ -14,3 +14,5 @@
'Performance Evaluation', etc.
7. HADOOP-1420, HADOOP-1423. Findbugs changes, remove reference to removed
class HLocking.
+ 8. HADOOP-1424. TestHBaseCluster fails with IllegalMonitorStateException. Fix
+ regression introduced by HADOOP-1397.
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java?view=auto&rev=541095
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java Wed May 23 14:30:25 2007
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * HLocking is a set of lock primitives that does not rely on a
+ * particular thread holding the monitor for an object. This is
+ * especially important when a lock must persist over multiple RPC's
+ * since there is no guarantee that the same Server thread will handle
+ * all the RPC's until the lock is released. Not requiring that the locker
+ * thread is same as unlocking thread is the key distinction between this
+ * class and {@link java.util.concurrent.locks.ReentrantReadWriteLock}.
+ *
+ * <p>For each independent entity that needs locking, create a new HLocking
+ * instance.
+ */
+public class HLocking {
+ private Integer mutex;
+
+ // If lockers == 0, the lock is unlocked
+ // If lockers > 0, locked for read
+ // If lockers == -1 locked for write
+
+ private AtomicInteger lockers;
+
+ /** Constructor */
+ public HLocking() {
+ this.mutex = new Integer(0);
+ this.lockers = new AtomicInteger(0);
+ }
+
+ /**
+ * Caller needs the nonexclusive read-lock
+ */
+ public void obtainReadLock() {
+ synchronized(mutex) {
+ while(lockers.get() < 0) {
+ try {
+ mutex.wait();
+ } catch(InterruptedException ie) {
+ }
+ }
+ lockers.incrementAndGet();
+ mutex.notifyAll();
+ }
+ }
+
+ /**
+ * Caller is finished with the nonexclusive read-lock
+ */
+ public void releaseReadLock() {
+ synchronized(mutex) {
+ if(lockers.decrementAndGet() < 0) {
+ throw new IllegalStateException("lockers: " + lockers);
+ }
+ mutex.notifyAll();
+ }
+ }
+
+ /**
+ * Caller needs the exclusive write-lock
+ */
+ public void obtainWriteLock() {
+ synchronized(mutex) {
+ while(!lockers.compareAndSet(0, -1)) {
+ try {
+ mutex.wait();
+ } catch (InterruptedException ie) {
+ }
+ }
+ mutex.notifyAll();
+ }
+ }
+
+ /**
+ * Caller is finished with the write lock
+ */
+ public void releaseWriteLock() {
+ synchronized(mutex) {
+ if(!lockers.compareAndSet(-1, 0)) {
+ throw new IllegalStateException("lockers: " + lockers);
+ }
+ mutex.notifyAll();
+ }
+ }
+}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?view=diff&rev=541095&r1=541094&r2=541095
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Wed May 23 14:30:25 2007
@@ -15,14 +15,17 @@
*/
package org.apache.hadoop.hbase;
-import org.apache.hadoop.io.*;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
/*******************************************************************************
* The HMemcache holds in-memory modifications to the HRegion. This is really a
@@ -39,7 +42,7 @@
TreeMap<HStoreKey, BytesWritable> snapshot = null;
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final HLocking lock = new HLocking();
public HMemcache() {
super();
@@ -70,7 +73,7 @@
public Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
Snapshot retval = new Snapshot();
- this.lock.writeLock().lock();
+ this.lock.obtainWriteLock();
try {
if(snapshot != null) {
throw new IOException("Snapshot in progress!");
@@ -99,7 +102,7 @@
return retval;
} finally {
- this.lock.writeLock().unlock();
+ this.lock.releaseWriteLock();
}
}
@@ -109,7 +112,7 @@
* Modifying the structure means we need to obtain a writelock.
*/
public void deleteSnapshot() throws IOException {
- this.lock.writeLock().lock();
+ this.lock.obtainWriteLock();
try {
if(snapshot == null) {
@@ -135,7 +138,7 @@
}
} finally {
- this.lock.writeLock().unlock();
+ this.lock.releaseWriteLock();
}
}
@@ -145,14 +148,14 @@
* Operation uses a write lock.
*/
public void add(Text row, TreeMap<Text, BytesWritable> columns, long timestamp) {
- this.lock.writeLock().lock();
+ this.lock.obtainWriteLock();
try {
for (Map.Entry<Text, BytesWritable> es: columns.entrySet()) {
HStoreKey key = new HStoreKey(row, es.getKey(), timestamp);
memcache.put(key, es.getValue());
}
} finally {
- this.lock.writeLock().unlock();
+ this.lock.releaseWriteLock();
}
}
@@ -163,7 +166,7 @@
*/
public BytesWritable[] get(HStoreKey key, int numVersions) {
Vector<BytesWritable> results = new Vector<BytesWritable>();
- this.lock.readLock().lock();
+ this.lock.obtainReadLock();
try {
Vector<BytesWritable> result = get(memcache, key, numVersions-results.size());
results.addAll(0, result);
@@ -180,7 +183,7 @@
return (results.size() == 0)?
null: results.toArray(new BytesWritable[results.size()]);
} finally {
- this.lock.readLock().unlock();
+ this.lock.releaseReadLock();
}
}
@@ -192,7 +195,7 @@
*/
public TreeMap<Text, BytesWritable> getFull(HStoreKey key) {
TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
- this.lock.readLock().lock();
+ this.lock.obtainReadLock();
try {
internalGetFull(memcache, key, results);
for(int i = history.size()-1; i >= 0; i--) {
@@ -202,7 +205,7 @@
return results;
} finally {
- this.lock.readLock().unlock();
+ this.lock.releaseReadLock();
}
}
@@ -275,7 +278,7 @@
super(timestamp, targetCols);
- lock.readLock().lock();
+ lock.obtainReadLock();
try {
this.backingMaps = new TreeMap[history.size() + 1];
@@ -367,7 +370,7 @@
}
} finally {
- lock.readLock().unlock();
+ lock.releaseReadLock();
scannerClosed = true;
}
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?view=diff&rev=541095&r1=541094&r2=541095
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Wed May 23 14:30:25 2007
@@ -23,7 +23,6 @@
import java.io.*;
import java.util.*;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* HRegion stores data for a certain region of a table. It stores all columns
@@ -283,7 +282,7 @@
int maxUnflushedEntries = 0;
int compactionThreshold = 0;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final HLocking lock = new HLocking();
//////////////////////////////////////////////////////////////////////////////
// Constructor
@@ -398,7 +397,7 @@
* time-sensitive thread.
*/
public Vector<HStoreFile> close() throws IOException {
- lock.writeLock().lock();
+ lock.obtainWriteLock();
try {
boolean shouldClose = false;
synchronized(writestate) {
@@ -438,7 +437,7 @@
}
}
} finally {
- lock.writeLock().unlock();
+ lock.releaseWriteLock();
}
}
@@ -614,7 +613,7 @@
* @return - true if the region should be split
*/
public boolean needsSplit(Text midKey) {
- lock.readLock().lock();
+ lock.obtainReadLock();
try {
Text key = new Text();
@@ -632,7 +631,7 @@
return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2)));
} finally {
- lock.readLock().unlock();
+ lock.releaseReadLock();
}
}
@@ -641,7 +640,7 @@
*/
public boolean needsCompaction() {
boolean needsCompaction = false;
- lock.readLock().lock();
+ lock.obtainReadLock();
try {
for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
if(i.next().getNMaps() > compactionThreshold) {
@@ -650,7 +649,7 @@
}
}
} finally {
- lock.readLock().unlock();
+ lock.releaseReadLock();
}
return needsCompaction;
}
@@ -670,7 +669,7 @@
*/
public boolean compactStores() throws IOException {
boolean shouldCompact = false;
- lock.readLock().lock();
+ lock.obtainReadLock();
try {
synchronized(writestate) {
if((! writestate.writesOngoing)
@@ -683,32 +682,30 @@
}
}
} finally {
- lock.readLock().unlock();
+ lock.releaseReadLock();
}
if(! shouldCompact) {
LOG.info("not compacting region " + this.regionInfo.regionName);
- return false;
-
- } else {
- lock.writeLock().lock();
- try {
- LOG.info("starting compaction on region " + this.regionInfo.regionName);
- for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
- HStore store = it.next();
- store.compact();
- }
- LOG.info("compaction completed on region " + this.regionInfo.regionName);
- return true;
-
- } finally {
- synchronized(writestate) {
- writestate.writesOngoing = false;
- recentCommits = 0;
- writestate.notifyAll();
- }
- lock.writeLock().unlock();
+ return false;
+ }
+ lock.obtainWriteLock();
+ try {
+ LOG.info("starting compaction on region " + this.regionInfo.regionName);
+ for (Iterator<HStore> it = stores.values().iterator(); it.hasNext();) {
+ HStore store = it.next();
+ store.compact();
+ }
+ LOG.info("compaction completed on region " + this.regionInfo.regionName);
+ return true;
+
+ } finally {
+ synchronized (writestate) {
+ writestate.writesOngoing = false;
+ recentCommits = 0;
+ writestate.notifyAll();
}
+ lock.releaseWriteLock();
}
}
@@ -928,7 +925,7 @@
private BytesWritable[] get(HStoreKey key, int numVersions) throws IOException {
- lock.readLock().lock();
+ lock.obtainReadLock();
try {
// Check the memcache
@@ -948,7 +945,7 @@
return targetStore.get(key, numVersions);
} finally {
- lock.readLock().unlock();
+ lock.releaseReadLock();
}
}
@@ -965,7 +962,7 @@
public TreeMap<Text, BytesWritable> getFull(Text row) throws IOException {
HStoreKey key = new HStoreKey(row, System.currentTimeMillis());
- lock.readLock().lock();
+ lock.obtainReadLock();
try {
TreeMap<Text, BytesWritable> memResult = memcache.getFull(key);
for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) {
@@ -976,7 +973,7 @@
return memResult;
} finally {
- lock.readLock().unlock();
+ lock.releaseReadLock();
}
}
@@ -985,7 +982,7 @@
* columns. This Iterator must be closed by the caller.
*/
public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) throws IOException {
- lock.readLock().lock();
+ lock.obtainReadLock();
try {
TreeSet<Text> families = new TreeSet<Text>();
for(int i = 0; i < cols.length; i++) {
@@ -1001,7 +998,7 @@
return new HScanner(cols, firstRow, memcache, storelist);
} finally {
- lock.readLock().unlock();
+ lock.releaseReadLock();
}
}
@@ -1024,11 +1021,11 @@
// We obtain a per-row lock, so other clients will
// block while one client performs an update.
- lock.readLock().lock();
+ lock.obtainReadLock();
try {
return obtainLock(row);
} finally {
- lock.readLock().unlock();
+ lock.releaseReadLock();
}
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?view=diff&rev=541095&r1=541094&r2=541095
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Wed May 23 14:30:25 2007
@@ -23,7 +23,6 @@
import java.util.Random;
import java.util.TreeMap;
import java.util.Vector;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -64,7 +63,7 @@
Integer compactLock = 0;
Integer flushLock = 0;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final HLocking lock = new HLocking();
TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>();
TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>();
@@ -237,7 +236,7 @@
/** Turn off all the MapFile readers */
public void close() throws IOException {
LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily);
- this.lock.writeLock().lock();
+ this.lock.obtainWriteLock();
try {
for (MapFile.Reader map: maps.values()) {
map.close();
@@ -247,7 +246,7 @@
LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily);
} finally {
- this.lock.writeLock().unlock();
+ this.lock.releaseWriteLock();
}
}
@@ -319,7 +318,7 @@
// C. Finally, make the new MapFile available.
if(addToAvailableMaps) {
- this.lock.writeLock().lock();
+ this.lock.obtainWriteLock();
try {
maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf));
@@ -330,7 +329,7 @@
}
} finally {
- this.lock.writeLock().unlock();
+ this.lock.releaseWriteLock();
}
}
return getAllMapFiles();
@@ -338,12 +337,12 @@
}
public Vector<HStoreFile> getAllMapFiles() {
- this.lock.readLock().lock();
+ this.lock.obtainReadLock();
try {
return new Vector<HStoreFile>(mapFiles.values());
} finally {
- this.lock.readLock().unlock();
+ this.lock.releaseReadLock();
}
}
@@ -385,12 +384,12 @@
// Grab a list of files to compact.
Vector<HStoreFile> toCompactFiles = null;
- this.lock.writeLock().lock();
+ this.lock.obtainWriteLock();
try {
toCompactFiles = new Vector<HStoreFile>(mapFiles.values());
} finally {
- this.lock.writeLock().unlock();
+ this.lock.releaseWriteLock();
}
// Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps
@@ -627,7 +626,7 @@
Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily);
- this.lock.writeLock().lock();
+ this.lock.obtainWriteLock();
try {
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
if(! fs.exists(doneFile)) {
@@ -744,7 +743,7 @@
// 7. Releasing the write-lock
- this.lock.writeLock().unlock();
+ this.lock.releaseWriteLock();
}
}
@@ -760,7 +759,7 @@
* The returned object should map column names to byte arrays (byte[]).
*/
public void getFull(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException {
- this.lock.readLock().lock();
+ this.lock.obtainReadLock();
try {
MapFile.Reader[] maparray
= maps.values().toArray(new MapFile.Reader[maps.size()]);
@@ -789,7 +788,7 @@
}
} finally {
- this.lock.readLock().unlock();
+ this.lock.releaseReadLock();
}
}
@@ -805,7 +804,7 @@
}
Vector<BytesWritable> results = new Vector<BytesWritable>();
- this.lock.readLock().lock();
+ this.lock.obtainReadLock();
try {
MapFile.Reader[] maparray
= maps.values().toArray(new MapFile.Reader[maps.size()]);
@@ -846,7 +845,7 @@
}
} finally {
- this.lock.readLock().unlock();
+ this.lock.releaseReadLock();
}
}
@@ -862,7 +861,7 @@
return maxSize;
}
- this.lock.readLock().lock();
+ this.lock.obtainReadLock();
try {
long mapIndex = 0L;
@@ -889,7 +888,7 @@
LOG.warn(e);
} finally {
- this.lock.readLock().unlock();
+ this.lock.releaseReadLock();
}
return maxSize;
}
@@ -898,12 +897,12 @@
* @return Returns the number of map files currently in use
*/
public int getNMaps() {
- this.lock.readLock().lock();
+ this.lock.obtainReadLock();
try {
return maps.size();
} finally {
- this.lock.readLock().unlock();
+ this.lock.releaseReadLock();
}
}
@@ -945,7 +944,7 @@
super(timestamp, targetCols);
- lock.readLock().lock();
+ lock.obtainReadLock();
try {
this.readers = new MapFile.Reader[mapFiles.size()];
@@ -1060,7 +1059,7 @@
}
} finally {
- lock.readLock().unlock();
+ lock.releaseReadLock();
scannerClosed = true;
}
}