You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2013/08/14 19:41:29 UTC
svn commit: r1513974 -
/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
Author: eclark
Date: Wed Aug 14 17:41:29 2013
New Revision: 1513974
URL: http://svn.apache.org/r1513974
Log:
HBASE-9144 Leases class has contention that's not needed
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java?rev=1513974&r1=1513973&r2=1513974&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java Wed Aug 14 17:41:29 2013
@@ -25,7 +25,9 @@ import org.apache.hadoop.hbase.util.HasT
import java.util.ConcurrentModificationException;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
@@ -54,10 +56,10 @@ import java.io.IOException;
@InterfaceAudience.Private
public class Leases extends HasThread {
private static final Log LOG = LogFactory.getLog(Leases.class.getName());
- private final int leaseCheckFrequency;
- private final DelayQueue<Lease> leaseQueue = new DelayQueue<Lease>();
- protected final Map<String, Lease> leases = new HashMap<String, Lease>();
- private volatile boolean stopRequested = false;
+ private final Map<String, Lease> leases = new ConcurrentHashMap<String, Lease>();
+
+ protected final int leaseCheckFrequency;
+ protected volatile boolean stopRequested = false;
/**
* Creates a lease monitor
@@ -71,14 +73,22 @@ public class Leases extends HasThread {
}
/**
- * @see java.lang.Thread#run()
+ * @see Thread#run()
*/
@Override
public void run() {
- while (!stopRequested || (stopRequested && leaseQueue.size() > 0) ) {
- Lease lease = null;
+ long toWait = leaseCheckFrequency;
+ Lease nextLease = null;
+ long nextLeaseDelay = Long.MAX_VALUE;
+
+ while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
+
try {
- lease = leaseQueue.poll(leaseCheckFrequency, TimeUnit.MILLISECONDS);
+ if (nextLease != null) {
+ toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
+ }
+ toWait = Math.min(leaseCheckFrequency, toWait);
+ Thread.sleep(toWait);
} catch (InterruptedException e) {
continue;
} catch (ConcurrentModificationException e) {
@@ -87,18 +97,28 @@ public class Leases extends HasThread {
LOG.fatal("Unexpected exception killed leases thread", e);
break;
}
- if (lease == null) {
- continue;
- }
- // A lease expired. Run the expired code before removing from queue
- // since its presence in queue is used to see if lease exists still.
- if (lease.getListener() == null) {
- LOG.error("lease listener is null for lease " + lease.getLeaseName());
- } else {
- lease.getListener().leaseExpired();
- }
- synchronized (leaseQueue) {
- leases.remove(lease.getLeaseName());
+
+ nextLease = null;
+ nextLeaseDelay = Long.MAX_VALUE;
+ for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<String, Lease> entry = it.next();
+ Lease lease = entry.getValue();
+ long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
+ if ( thisLeaseDelay > 0) {
+ if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
+ nextLease = lease;
+ nextLeaseDelay = thisLeaseDelay;
+ }
+ } else {
+ // A lease expired. Run the expired code before removing from map
+ // since its presence in map is used to see if lease exists still.
+ if (lease.getListener() == null) {
+ LOG.error("lease listener is null for lease " + lease.getLeaseName());
+ } else {
+ lease.getListener().leaseExpired();
+ }
+ it.remove();
+ }
}
}
close();
@@ -122,17 +142,13 @@ public class Leases extends HasThread {
public void close() {
LOG.info(Thread.currentThread().getName() + " closing leases");
this.stopRequested = true;
- synchronized (leaseQueue) {
- leaseQueue.clear();
- leases.clear();
- leaseQueue.notifyAll();
- }
+ leases.clear();
LOG.info(Thread.currentThread().getName() + " closed leases");
}
/**
* Obtain a lease.
- *
+ *
* @param leaseName name of the lease
* @param leaseTimeoutPeriod length of the lease in milliseconds
* @param listener listener that will process lease expirations
@@ -153,61 +169,34 @@ public class Leases extends HasThread {
return;
}
lease.resetExpirationTime();
- synchronized (leaseQueue) {
- if (leases.containsKey(lease.getLeaseName())) {
- throw new LeaseStillHeldException(lease.getLeaseName());
- }
- leases.put(lease.getLeaseName(), lease);
- leaseQueue.add(lease);
- }
- }
-
- /**
- * Thrown if we are asked create a lease but lease on passed name already
- * exists.
- */
- @SuppressWarnings("serial")
- public static class LeaseStillHeldException extends IOException {
- private final String leaseName;
-
- /**
- * @param name
- */
- public LeaseStillHeldException(final String name) {
- this.leaseName = name;
- }
-
- /** @return name of lease */
- public String getName() {
- return this.leaseName;
+ if (leases.containsKey(lease.getLeaseName())) {
+ throw new LeaseStillHeldException(lease.getLeaseName());
}
+ leases.put(lease.getLeaseName(), lease);
}
/**
* Renew a lease
*
* @param leaseName name of lease
- * @throws org.apache.hadoop.hbase.regionserver.LeaseException
+ * @throws LeaseException
*/
public void renewLease(final String leaseName) throws LeaseException {
- synchronized (leaseQueue) {
- Lease lease = leases.get(leaseName);
- // We need to check to see if the remove is successful as the poll in the run()
- // method could have completed between the get and the remove which will result
- // in a corrupt leaseQueue.
- if (lease == null || !leaseQueue.remove(lease)) {
- throw new LeaseException("lease '" + leaseName +
- "' does not exist or has already expired");
- }
- lease.resetExpirationTime();
- leaseQueue.add(lease);
+ Lease lease = leases.get(leaseName);
+ // We need to check to see if the remove is successful as the poll in the run()
+ // method could have completed between the get and the remove which will result
+ // in a corrupt leaseQueue.
+ if (lease == null ) {
+ throw new LeaseException("lease '" + leaseName +
+ "' does not exist or has already expired");
}
+ lease.resetExpirationTime();
}
/**
* Client explicitly cancels a lease.
* @param leaseName name of lease
- * @throws LeaseException
+ * @throws org.apache.hadoop.hbase.regionserver.LeaseException
*/
public void cancelLease(final String leaseName) throws LeaseException {
removeLease(leaseName);
@@ -219,21 +208,38 @@ public class Leases extends HasThread {
* Lease can be resinserted using {@link #addLease(Lease)}
*
* @param leaseName name of lease
- * @throws LeaseException
+ * @throws org.apache.hadoop.hbase.regionserver.LeaseException
* @return Removed lease
*/
Lease removeLease(final String leaseName) throws LeaseException {
- Lease lease = null;
- synchronized (leaseQueue) {
- lease = leases.remove(leaseName);
- if (lease == null) {
- throw new LeaseException("lease '" + leaseName + "' does not exist");
- }
- leaseQueue.remove(lease);
+ Lease lease = leases.remove(leaseName);
+ if (lease == null) {
+ throw new LeaseException("lease '" + leaseName + "' does not exist");
}
return lease;
}
+ /**
+ * Thrown if we are asked create a lease but lease on passed name already
+ * exists.
+ */
+ @SuppressWarnings("serial")
+ public static class LeaseStillHeldException extends IOException {
+ private final String leaseName;
+
+ /**
+ * @param name
+ */
+ public LeaseStillHeldException(final String name) {
+ this.leaseName = name;
+ }
+
+ /** @return name of lease */
+ public String getName() {
+ return this.leaseName;
+ }
+ }
+
/** This class tracks a single Lease. */
static class Lease implements Delayed {
private final String leaseName;