You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2013/08/14 19:41:29 UTC

svn commit: r1513974 - /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java

Author: eclark
Date: Wed Aug 14 17:41:29 2013
New Revision: 1513974

URL: http://svn.apache.org/r1513974
Log:
HBASE-9144 Leases class has contention that's not needed

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java?rev=1513974&r1=1513973&r2=1513974&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java Wed Aug 14 17:41:29 2013
@@ -25,7 +25,9 @@ import org.apache.hadoop.hbase.util.HasT
 
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.TimeUnit;
@@ -54,10 +56,10 @@ import java.io.IOException;
 @InterfaceAudience.Private
 public class Leases extends HasThread {
   private static final Log LOG = LogFactory.getLog(Leases.class.getName());
-  private final int leaseCheckFrequency;
-  private final DelayQueue<Lease> leaseQueue = new DelayQueue<Lease>();
-  protected final Map<String, Lease> leases = new HashMap<String, Lease>();
-  private volatile boolean stopRequested = false;
+  private final Map<String, Lease> leases = new ConcurrentHashMap<String, Lease>();
+
+  protected final int leaseCheckFrequency;
+  protected volatile boolean stopRequested = false;
 
   /**
    * Creates a lease monitor
@@ -71,14 +73,22 @@ public class Leases extends HasThread {
   }
 
   /**
-   * @see java.lang.Thread#run()
+   * @see Thread#run()
    */
   @Override
   public void run() {
-    while (!stopRequested || (stopRequested && leaseQueue.size() > 0) ) {
-      Lease lease = null;
+    long toWait = leaseCheckFrequency;
+    Lease nextLease = null;
+    long nextLeaseDelay = Long.MAX_VALUE;
+
+    while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
+
       try {
-        lease = leaseQueue.poll(leaseCheckFrequency, TimeUnit.MILLISECONDS);
+        if (nextLease != null) {
+          toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
+        }
+        toWait = Math.min(leaseCheckFrequency, toWait);
+        Thread.sleep(toWait);
       } catch (InterruptedException e) {
         continue;
       } catch (ConcurrentModificationException e) {
@@ -87,18 +97,28 @@ public class Leases extends HasThread {
         LOG.fatal("Unexpected exception killed leases thread", e);
         break;
       }
-      if (lease == null) {
-        continue;
-      }
-      // A lease expired.  Run the expired code before removing from queue
-      // since its presence in queue is used to see if lease exists still.
-      if (lease.getListener() == null) {
-        LOG.error("lease listener is null for lease " + lease.getLeaseName());
-      } else {
-        lease.getListener().leaseExpired();
-      }
-      synchronized (leaseQueue) {
-        leases.remove(lease.getLeaseName());
+
+      nextLease = null;
+      nextLeaseDelay = Long.MAX_VALUE;
+      for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
+        Map.Entry<String, Lease> entry = it.next();
+        Lease lease = entry.getValue();
+        long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
+        if ( thisLeaseDelay > 0) {
+          if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
+            nextLease = lease;
+            nextLeaseDelay = thisLeaseDelay;
+          }
+        } else {
+          // A lease expired.  Run the expired code before removing from map
+          // since its presence in map is used to see if lease exists still.
+          if (lease.getListener() == null) {
+            LOG.error("lease listener is null for lease " + lease.getLeaseName());
+          } else {
+            lease.getListener().leaseExpired();
+          }
+          it.remove();
+        }
       }
     }
     close();
@@ -122,17 +142,13 @@ public class Leases extends HasThread {
   public void close() {
     LOG.info(Thread.currentThread().getName() + " closing leases");
     this.stopRequested = true;
-    synchronized (leaseQueue) {
-      leaseQueue.clear();
-      leases.clear();
-      leaseQueue.notifyAll();
-    }
+    leases.clear();
     LOG.info(Thread.currentThread().getName() + " closed leases");
   }
 
   /**
    * Obtain a lease.
-   * 
+   *
    * @param leaseName name of the lease
    * @param leaseTimeoutPeriod length of the lease in milliseconds
    * @param listener listener that will process lease expirations
@@ -153,61 +169,34 @@ public class Leases extends HasThread {
       return;
     }
     lease.resetExpirationTime();
-    synchronized (leaseQueue) {
-      if (leases.containsKey(lease.getLeaseName())) {
-        throw new LeaseStillHeldException(lease.getLeaseName());
-      }
-      leases.put(lease.getLeaseName(), lease);
-      leaseQueue.add(lease);
-    }
-  }
-
-  /**
-   * Thrown if we are asked create a lease but lease on passed name already
-   * exists.
-   */
-  @SuppressWarnings("serial")
-  public static class LeaseStillHeldException extends IOException {
-    private final String leaseName;
-
-    /**
-     * @param name
-     */
-    public LeaseStillHeldException(final String name) {
-      this.leaseName = name;
-    }
-
-    /** @return name of lease */
-    public String getName() {
-      return this.leaseName;
+    if (leases.containsKey(lease.getLeaseName())) {
+      throw new LeaseStillHeldException(lease.getLeaseName());
     }
+    leases.put(lease.getLeaseName(), lease);
   }
 
   /**
    * Renew a lease
    *
    * @param leaseName name of lease
-   * @throws org.apache.hadoop.hbase.regionserver.LeaseException
+   * @throws LeaseException
    */
   public void renewLease(final String leaseName) throws LeaseException {
-    synchronized (leaseQueue) {
-      Lease lease = leases.get(leaseName);
-      // We need to check to see if the remove is successful as the poll in the run()
-      // method could have completed between the get and the remove which will result
-      // in a corrupt leaseQueue.
-      if (lease == null || !leaseQueue.remove(lease)) {
-        throw new LeaseException("lease '" + leaseName +
-        "' does not exist or has already expired");
-      }
-      lease.resetExpirationTime();
-      leaseQueue.add(lease);
+    Lease lease = leases.get(leaseName);
+    // We need to check to see if the remove is successful as the poll in the run()
+    // method could have completed between the get and the remove which will result
+    // in a corrupt leaseQueue.
+    if (lease == null ) {
+      throw new LeaseException("lease '" + leaseName +
+          "' does not exist or has already expired");
     }
+    lease.resetExpirationTime();
   }
 
   /**
    * Client explicitly cancels a lease.
    * @param leaseName name of lease
-   * @throws LeaseException
+   * @throws org.apache.hadoop.hbase.regionserver.LeaseException
    */
   public void cancelLease(final String leaseName) throws LeaseException {
     removeLease(leaseName);
@@ -219,21 +208,38 @@ public class Leases extends HasThread {
    * Lease can be resinserted using {@link #addLease(Lease)}
    *
    * @param leaseName name of lease
-   * @throws LeaseException
+   * @throws org.apache.hadoop.hbase.regionserver.LeaseException
    * @return Removed lease
    */
   Lease removeLease(final String leaseName) throws LeaseException {
-    Lease lease =  null;
-    synchronized (leaseQueue) {
-      lease = leases.remove(leaseName);
-      if (lease == null) {
-        throw new LeaseException("lease '" + leaseName + "' does not exist");
-      }
-      leaseQueue.remove(lease);
+    Lease lease = leases.remove(leaseName);
+    if (lease == null) {
+      throw new LeaseException("lease '" + leaseName + "' does not exist");
     }
     return lease;
   }
 
+  /**
+   * Thrown if we are asked create a lease but lease on passed name already
+   * exists.
+   */
+  @SuppressWarnings("serial")
+  public static class LeaseStillHeldException extends IOException {
+    private final String leaseName;
+
+    /**
+     * @param name
+     */
+    public LeaseStillHeldException(final String name) {
+      this.leaseName = name;
+    }
+
+    /** @return name of lease */
+    public String getName() {
+      return this.leaseName;
+    }
+  }
+
   /** This class tracks a single Lease. */
   static class Lease implements Delayed {
     private final String leaseName;