You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/09/08 13:40:29 UTC

hbase git commit: HBASE-18778 Use Comparator for StealJobQueue

Repository: hbase
Updated Branches:
  refs/heads/master e69b05d10 -> 331910192


HBASE-18778 Use Comparator for StealJobQueue


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/33191019
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/33191019
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/33191019

Branch: refs/heads/master
Commit: 331910192af158aac33f883ae132fba444dda003
Parents: e69b05d
Author: zhangduo <zh...@apache.org>
Authored: Fri Sep 8 18:12:04 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Sep 8 21:27:12 2017 +0800

----------------------------------------------------------------------
 .../hbase/master/cleaner/HFileCleaner.java      | 50 +++++-------
 .../hadoop/hbase/regionserver/CompactSplit.java | 83 +++++++++++++++-----
 .../apache/hadoop/hbase/regionserver/Store.java |  3 +-
 .../compactions/CompactionRequest.java          | 46 ++---------
 .../apache/hadoop/hbase/util/StealJobQueue.java | 20 +++--
 .../hadoop/hbase/util/TestStealJobQueue.java    |  5 +-
 6 files changed, 108 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/33191019/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index 5f80e81..6f952f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.cleaner;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -26,13 +27,13 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.util.StealJobQueue;
@@ -111,7 +112,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
         conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
     smallQueueInitSize =
         conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
-    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
+    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
     smallFileQueue = largeFileQueue.getStealFromQueue();
     largeFileDeleteThreadNumber =
         conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
@@ -299,7 +300,21 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
     }
   }
 
-  static class HFileDeleteTask implements Comparable<HFileDeleteTask> {
+  private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() {
+
+    @Override
+    public int compare(HFileDeleteTask o1, HFileDeleteTask o2) {
+      // larger file first so reverse compare
+      int cmp = Long.compare(o2.fileLength, o1.fileLength);
+      if (cmp != 0) {
+        return cmp;
+      }
+      // just use hashCode to generate a stable result.
+      return System.identityHashCode(o1) - System.identityHashCode(o2);
+    }
+  };
+
+  private static final class HFileDeleteTask {
     private static final long MAX_WAIT = 60 * 1000L;
     private static final long WAIT_UNIT = 1000L;
 
@@ -341,31 +356,6 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
       }
       return this.result;
     }
-
-    @Override
-    public int compareTo(HFileDeleteTask o) {
-      long sub = this.fileLength - o.fileLength;
-      // smaller value with higher priority in PriorityQueue, and we intent to delete the larger
-      // file first.
-      return (sub > 0) ? -1 : (sub < 0 ? 1 : 0);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || !(o instanceof HFileDeleteTask)) {
-        return false;
-      }
-      HFileDeleteTask otherTask = (HFileDeleteTask) o;
-      return this.filePath.equals(otherTask.filePath) && (this.fileLength == otherTask.fileLength);
-    }
-
-    @Override
-    public int hashCode() {
-      return filePath.hashCode();
-    }
   }
 
   @VisibleForTesting
@@ -414,7 +404,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
     for (HFileDeleteTask task : smallFileQueue) {
       leftOverTasks.add(task);
     }
-    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
+    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
     smallFileQueue = largeFileQueue.getStealFromQueue();
     threads.clear();
     startHFileDeleteThreads();

http://git-wip-us.apache.org/repos/asf/hbase/blob/33191019/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index c0fc741..11cbf0f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -114,7 +115,7 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
 
     final String n = Thread.currentThread().getName();
 
-    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>();
+    StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
     this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
         60, TimeUnit.SECONDS, stealJobQueue,
         new ThreadFactory() {
@@ -424,9 +425,60 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
     return this.regionSplitLimit;
   }
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
-      justification="Contrived use of compareTo")
-  private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
+  private static final Comparator<Runnable> COMPARATOR =
+      new Comparator<Runnable>() {
+
+    private int compare(CompactionRequest r1, CompactionRequest r2) {
+      if (r1 == r2) {
+        return 0; //they are the same request
+      }
+      // less first
+      int cmp = Integer.compare(r1.getPriority(), r2.getPriority());
+      if (cmp != 0) {
+        return cmp;
+      }
+      cmp = Long.compare(r1.getSelectionNanoTime(), r2.getSelectionNanoTime());
+      if (cmp != 0) {
+        return cmp;
+      }
+
+      // break the tie based on hash code
+      return System.identityHashCode(r1) - System.identityHashCode(r2);
+    }
+
+    @Override
+    public int compare(Runnable r1, Runnable r2) {
+      // CompactionRunner first
+      if (r1 instanceof CompactionRunner) {
+        if (!(r2 instanceof CompactionRunner)) {
+          return -1;
+        }
+      } else {
+        if (r2 instanceof CompactionRunner) {
+          return 1;
+        } else {
+          // break the tie based on hash code
+          return System.identityHashCode(r1) - System.identityHashCode(r2);
+        }
+      }
+      CompactionRunner o1 = (CompactionRunner) r1;
+      CompactionRunner o2 = (CompactionRunner) r2;
+      // less first
+      int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority);
+      if (cmp != 0) {
+        return cmp;
+      }
+      CompactionContext c1 = o1.compaction;
+      CompactionContext c2 = o2.compaction;
+      if (c1 == null) {
+        return c2 == null ? 0 : 1;
+      } else {
+        return c2 == null ? -1 : compare(c1.getRequest(), c2.getRequest());
+      }
+    }
+  };
+
+  private final class CompactionRunner implements Runnable {
     private final Store store;
     private final HRegion region;
     private CompactionContext compaction;
@@ -435,17 +487,17 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
     private User user;
     private long time;
 
-    public CompactionRunner(Store store, Region region,
-        CompactionContext compaction, ThreadPoolExecutor parent, User user) {
+    public CompactionRunner(Store store, Region region, CompactionContext compaction,
+        ThreadPoolExecutor parent, User user) {
       super();
       this.store = store;
-      this.region = (HRegion)region;
+      this.region = (HRegion) region;
       this.compaction = compaction;
-      this.queuedPriority = (this.compaction == null)
-          ? store.getCompactPriority() : compaction.getRequest().getPriority();
+      this.queuedPriority =
+          compaction == null ? store.getCompactPriority() : compaction.getRequest().getPriority();
       this.parent = parent;
       this.user = user;
-      this.time =  System.currentTimeMillis();
+      this.time = System.currentTimeMillis();
     }
 
     @Override
@@ -554,17 +606,6 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
       pw.flush();
       return sw.toString();
     }
-
-    @Override
-    public int compareTo(CompactionRunner o) {
-      // Only compare the underlying request (if any), for queue sorting purposes.
-      int compareVal = queuedPriority - o.queuedPriority; // compare priority
-      if (compareVal != 0) return compareVal;
-      CompactionContext tc = this.compaction, oc = o.compaction;
-      // Sort pre-selected (user?) compactions before system ones with equal priority.
-      return (tc == null) ? ((oc == null) ? 0 : 1)
-          : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/33191019/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index fd9de9b..932c0c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -53,7 +53,8 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
 
   /* The default priority for user-specified compaction requests.
    * The user gets top priority unless we have blocking compactions. (Pri <= 0)
-   */ int PRIORITY_USER = 1;
+   */
+  int PRIORITY_USER = 1;
   int NO_PRIORITY = Integer.MIN_VALUE;
 
   // General Accessors

http://git-wip-us.apache.org/repos/asf/hbase/blob/33191019/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
index 340b780..127fc14 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
  */
 @InterfaceAudience.LimitedPrivate({ "coprocessor" })
 @InterfaceStability.Evolving
-public class CompactionRequest implements Comparable<CompactionRequest> {
+public class CompactionRequest {
 
   // was this compaction promoted to an off-peak
   private boolean isOffPeak = false;
@@ -49,7 +49,7 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
   // CompactRequest object creation time.
   private long selectionTime;
   // System time used to compare objects in FIFO order. TODO: maybe use selectionTime?
-  private Long timeInNanos;
+  private long timeInNanos;
   private String regionName = "";
   private String storeName = "";
   private long totalSize = -1L;
@@ -71,6 +71,7 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
 
   public void updateFiles(Collection<StoreFile> files) {
     this.filesToCompact = files;
+    recalculateSize();
   }
 
   /**
@@ -104,43 +105,6 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
     return this;
   }
 
-  /**
-   * This function will define where in the priority queue the request will
-   * end up.  Those with the highest priorities will be first.  When the
-   * priorities are the same it will first compare priority then date
-   * to maintain a FIFO functionality.
-   *
-   * <p>Note: The enqueue timestamp is accurate to the nanosecond. if two
-   * requests have same timestamp then this function will break the tie
-   * arbitrarily with hashCode() comparing.
-   */
-  @Override
-  public int compareTo(CompactionRequest request) {
-    //NOTE: The head of the priority queue is the least element
-    if (this.equals(request)) {
-      return 0; //they are the same request
-    }
-    int compareVal;
-
-    compareVal = priority - request.priority; //compare priority
-    if (compareVal != 0) {
-      return compareVal;
-    }
-
-    compareVal = timeInNanos.compareTo(request.timeInNanos);
-    if (compareVal != 0) {
-      return compareVal;
-    }
-
-    // break the tie based on hash code
-    return this.hashCode() - request.hashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    return (this == obj);
-  }
-
   public Collection<StoreFile> getFiles() {
     return this.filesToCompact;
   }
@@ -189,6 +153,10 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
     return this.selectionTime;
   }
 
+  public long getSelectionNanoTime() {
+    return this.timeInNanos;
+  }
+
   /**
    * Specify if this compaction should be a major compaction based on the state of the store
    * @param isMajor <tt>true</tt> if the system determines that this compaction should be a major

http://git-wip-us.apache.org/repos/asf/hbase/blob/33191019/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
index 5e7e232..00f7cfb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
+import java.util.Comparator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -41,13 +42,17 @@ import java.util.concurrent.locks.ReentrantLock;
 @InterfaceAudience.Private
 public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
 
+  private static final long serialVersionUID = -6334572230936888291L;
+
   private BlockingQueue<T> stealFromQueue;
 
   private final Lock lock = new ReentrantLock();
-  private final Condition notEmpty = lock.newCondition();
+  private final transient Condition notEmpty = lock.newCondition();
+
+  public StealJobQueue(Comparator<? super T> comparator) {
+    this.stealFromQueue = new PriorityBlockingQueue<T>(11, comparator) {
 
-  public StealJobQueue() {
-    this.stealFromQueue = new PriorityBlockingQueue<T>() {
+      private static final long serialVersionUID = -7070010365201826904L;
 
       @Override
       public boolean offer(T t) {
@@ -62,9 +67,12 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
     };
   }
 
-  public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity) {
-    super(initCapacity);
-    this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity) {
+  public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity,
+      Comparator<? super T> comparator) {
+    super(initCapacity, comparator);
+    this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity, comparator) {
+
+      private static final long serialVersionUID = -6805567216580184701L;
 
       @Override
       public boolean offer(T t) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/33191019/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
index 54fdaca..22c9f6d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
@@ -42,7 +42,7 @@ public class TestStealJobQueue {
 
   @Before
   public void setup() {
-    stealJobQueue = new StealJobQueue<>();
+    stealJobQueue = new StealJobQueue<>(Integer::compare);
     stealFromQueue = stealJobQueue.getStealFromQueue();
 
   }
@@ -170,7 +170,8 @@ public class TestStealJobQueue {
 
   @Test
   public void testInteractWithThreadPool() throws InterruptedException {
-    StealJobQueue<Runnable> stealTasksQueue = new StealJobQueue<>();
+    StealJobQueue<Runnable> stealTasksQueue =
+        new StealJobQueue<>((r1, r2) -> ((TestTask) r1).compareTo((TestTask) r2));
     final CountDownLatch stealJobCountDown = new CountDownLatch(3);
     final CountDownLatch stealFromCountDown = new CountDownLatch(3);
     ThreadPoolExecutor stealPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue) {