You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/12/14 00:15:04 UTC

[36/50] [abbrv] hbase git commit: HBASE-20558 Port HBASE-17854 (Use StealJobQueue in HFileCleaner after HBASE-17215) to branch-1

HBASE-20558 Port HBASE-17854 (Use StealJobQueue in HFileCleaner after HBASE-17215) to branch-1

The third port commit of HBASE-20555

Signed-off-by: Zach York <zy...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/976f07e8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/976f07e8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/976f07e8

Branch: refs/heads/branch-1.3
Commit: 976f07e87cbeb35b80a7ca8eea46f973e932bf83
Parents: 30b1dc0
Author: TAK LON WU <wu...@amazon.com>
Authored: Mon Jul 9 16:34:06 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Dec 12 18:08:20 2018 -0800

----------------------------------------------------------------------
 .../hbase/master/cleaner/HFileCleaner.java      | 98 +++++++++++++-------
 .../apache/hadoop/hbase/util/StealJobQueue.java | 29 +++++-
 .../hbase/master/cleaner/TestHFileCleaner.java  | 28 +++---
 .../hadoop/hbase/util/TestStealJobQueue.java    | 14 +--
 4 files changed, 113 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/976f07e8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index defe851..70548b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.StealJobQueue;
 
 /**
  * This Chore, every time it runs, will clear the HFiles in the hfile archive
@@ -56,23 +56,23 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
       "hbase.regionserver.thread.hfilecleaner.throttle";
   public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M
 
-  // Configuration key for large queue size
-  public final static String LARGE_HFILE_DELETE_QUEUE_SIZE =
+  // Configuration key for large queue initial size
+  public final static String LARGE_HFILE_QUEUE_INIT_SIZE =
       "hbase.regionserver.hfilecleaner.large.queue.size";
-  public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576;
+  public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240;
 
-  // Configuration key for small queue size
-  public final static String SMALL_HFILE_DELETE_QUEUE_SIZE =
+  // Configuration key for small queue initial size
+  public final static String SMALL_HFILE_QUEUE_INIT_SIZE =
       "hbase.regionserver.hfilecleaner.small.queue.size";
-  public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576;
+  public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
 
   private static final Log LOG = LogFactory.getLog(HFileCleaner.class);
 
-  BlockingQueue<HFileDeleteTask> largeFileQueue;
+  StealJobQueue<HFileDeleteTask> largeFileQueue;
   BlockingQueue<HFileDeleteTask> smallFileQueue;
   private int throttlePoint;
-  private int largeQueueSize;
-  private int smallQueueSize;
+  private int largeQueueInitSize;
+  private int smallQueueInitSize;
   private List<Thread> threads = new ArrayList<Thread>();
   private boolean running;
 
@@ -93,12 +93,12 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
       directory, MASTER_HFILE_CLEANER_PLUGINS, params);
     throttlePoint =
         conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
-    largeQueueSize =
-        conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
-    smallQueueSize =
-        conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
-    largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
-    smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
+    largeQueueInitSize =
+        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
+    smallQueueInitSize =
+        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
+    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
+    smallFileQueue = largeFileQueue.getStealFromQueue();
     startHFileDeleteThreads();
   }
 
@@ -151,6 +151,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
   private boolean dispatch(HFileDeleteTask task) {
     if (task.fileLength >= this.throttlePoint) {
       if (!this.largeFileQueue.offer(task)) {
+        // should never arrive here as long as we use PriorityQueue
         if (LOG.isTraceEnabled()) {
           LOG.trace("Large file deletion queue is full");
         }
@@ -158,6 +159,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
       }
     } else {
       if (!this.smallFileQueue.offer(task)) {
+        // should never arrive here as long as we use PriorityQueue
         if (LOG.isTraceEnabled()) {
           LOG.trace("Small file deletion queue is full");
         }
@@ -231,7 +233,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
           }
           task.setResult(succeed);
           if (succeed) {
-            countDeletedFiles(queue == largeFileQueue);
+            countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue);
           }
         }
       }
@@ -243,8 +245,8 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
   }
 
   // Currently only for testing purpose
-  private void countDeletedFiles(boolean isLarge) {
-    if (isLarge) {
+  private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
+    if (isLargeFile) {
       if (deletedLargeFiles == Long.MAX_VALUE) {
         LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
         deletedLargeFiles = 0L;
@@ -255,6 +257,9 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
         LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
         deletedSmallFiles = 0L;
       }
+      if (fromLargeQueue && LOG.isTraceEnabled()) {
+        LOG.trace("Stolen a small file deletion task in large file thread");
+      }
       deletedSmallFiles++;
     }
   }
@@ -272,7 +277,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
     }
   }
 
-  static class HFileDeleteTask {
+  static class HFileDeleteTask implements Comparable<HFileDeleteTask> {
     private static final long MAX_WAIT = 60 * 1000L;
     private static final long WAIT_UNIT = 1000L;
 
@@ -314,6 +319,31 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
       }
       return this.result;
     }
+
+    @Override
+    public int compareTo(HFileDeleteTask o) {
+      long sub = this.fileLength - o.fileLength;
+      // smaller value with higher priority in PriorityQueue, and we intent to delete the larger
+      // file first.
+      return (sub > 0) ? -1 : (sub < 0 ? 1 : 0);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || !(o instanceof HFileDeleteTask)) {
+        return false;
+      }
+      HFileDeleteTask otherTask = (HFileDeleteTask) o;
+      return this.filePath.equals(otherTask.filePath) && (this.fileLength == otherTask.fileLength);
+    }
+
+    @Override
+    public int hashCode() {
+      return filePath.hashCode();
+    }
   }
 
   @VisibleForTesting
@@ -332,13 +362,13 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
   }
 
   @VisibleForTesting
-  public long getLargeQueueSize() {
-    return largeQueueSize;
+  public long getLargeQueueInitSize() {
+    return largeQueueInitSize;
   }
 
   @VisibleForTesting
-  public long getSmallQueueSize() {
-    return smallQueueSize;
+  public long getSmallQueueInitSize() {
+    return smallQueueInitSize;
   }
 
   @VisibleForTesting
@@ -350,15 +380,15 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
   public void onConfigurationChange(Configuration conf) {
     StringBuilder builder = new StringBuilder();
     builder.append("Updating configuration for HFileCleaner, previous throttle point: ")
-        .append(throttlePoint).append(", largeQueueSize: ").append(largeQueueSize)
-        .append(", smallQueueSize: ").append(smallQueueSize);
+        .append(throttlePoint).append(", largeQueueInitSize: ").append(largeQueueInitSize)
+        .append(", smallQueueInitSize: ").append(smallQueueInitSize);
     stopHFileDeleteThreads();
     this.throttlePoint =
         conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
-    this.largeQueueSize =
-        conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
-    this.smallQueueSize =
-        conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
+    this.largeQueueInitSize =
+        conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
+    this.smallQueueInitSize =
+        conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
     // record the left over tasks
     List<HFileDeleteTask> leftOverTasks = new ArrayList<>();
     for (HFileDeleteTask task : largeFileQueue) {
@@ -367,11 +397,11 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
     for (HFileDeleteTask task : smallFileQueue) {
       leftOverTasks.add(task);
     }
-    largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
-    smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
+    largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
+    smallFileQueue = largeFileQueue.getStealFromQueue();
     threads.clear();
-    builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueSize: ")
-        .append(largeQueueSize).append(", smallQueueSize: ").append(smallQueueSize);
+    builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueInitSize: ")
+        .append(largeQueueInitSize).append(", smallQueueInitSize: ").append(smallQueueInitSize);
     LOG.debug(builder.toString());
     startHFileDeleteThreads();
     // re-dispatch the left over tasks

http://git-wip-us.apache.org/repos/asf/hbase/blob/976f07e8/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
index 74f0747..c6d8ee0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hadoop.hbase.util;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -27,6 +25,8 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
 /**
  * This queue allows a ThreadPoolExecutor to steal jobs from another ThreadPoolExecutor.
  * This queue also acts as the factory for creating the PriorityBlockingQueue to be used in the
@@ -48,6 +48,24 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
 
   public StealJobQueue() {
     this.stealFromQueue = new PriorityBlockingQueue<T>() {
+
+      @Override
+      public boolean offer(T t) {
+        lock.lock();
+        try {
+          notEmpty.signal();
+          return super.offer(t);
+        } finally {
+          lock.unlock();
+        }
+      }
+    };
+  }
+
+  public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity) {
+    super(initCapacity);
+    this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity) {
+
       @Override
       public boolean offer(T t) {
         lock.lock();
@@ -61,6 +79,10 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
     };
   }
 
+  /**
+   * Get a queue whose job might be stolen by the consumer of this original queue
+   * @return the queue whose job could be stolen
+   */
   public BlockingQueue<T> getStealFromQueue() {
     return stealFromQueue;
   }
@@ -108,8 +130,9 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
           retVal = stealFromQueue.poll();
         }
         if (retVal == null) {
-          if (nanos <= 0)
+          if (nanos <= 0) {
             return null;
+          }
           nanos = notEmpty.awaitNanos(nanos);
         } else {
           return retVal;

http://git-wip-us.apache.org/repos/asf/hbase/blob/976f07e8/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 5712729..18afafa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -316,9 +316,9 @@ public class TestHFileCleaner {
   public void testOnConfigurationChange() throws Exception {
     // constants
     final int ORIGINAL_THROTTLE_POINT = 512 * 1024;
-    final int ORIGINAL_QUEUE_SIZE = 512;
+    final int ORIGINAL_QUEUE_INIT_SIZE = 512;
     final int UPDATE_THROTTLE_POINT = 1024;// small enough to change large/small check
-    final int UPDATE_QUEUE_SIZE = 1024;
+    final int UPDATE_QUEUE_INIT_SIZE = 1024;
     final int LARGE_FILE_NUM = 5;
     final int SMALL_FILE_NUM = 20;
 
@@ -326,8 +326,8 @@ public class TestHFileCleaner {
     // no cleaner policies = delete all files
     conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
     conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, ORIGINAL_THROTTLE_POINT);
-    conf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE);
-    conf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE);
+    conf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE);
+    conf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE);
     Server server = new DummyServer();
     Path archivedHfileDir =
         new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
@@ -336,8 +336,8 @@ public class TestHFileCleaner {
     FileSystem fs = UTIL.getDFSCluster().getFileSystem();
     final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
     Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint());
-    Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getLargeQueueSize());
-    Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getSmallQueueSize());
+    Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
+    Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
 
     // clean up archive directory and create files for testing
     fs.delete(archivedHfileDir, true);
@@ -353,22 +353,24 @@ public class TestHFileCleaner {
     };
     t.setDaemon(true);
     t.start();
-    // let the cleaner run for some while
-    Thread.sleep(20);
+    // wait until file clean started
+    while (cleaner.getNumOfDeletedSmallFiles() == 0) {
+      Thread.yield();
+    }
 
     // trigger configuration change
     Configuration newConf = new Configuration(conf);
     newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT);
-    newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE);
-    newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE);
-    cleaner.onConfigurationChange(newConf);
+    newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
+    newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
     LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
         + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
+    cleaner.onConfigurationChange(newConf);
 
     // check values after change
     Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint());
-    Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getLargeQueueSize());
-    Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getSmallQueueSize());
+    Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
+    Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
     Assert.assertEquals(2, cleaner.getCleanerThreads().size());
 
     // wait until clean done and check

http://git-wip-us.apache.org/repos/asf/hbase/blob/976f07e8/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
index 59a4115..531e4f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java
@@ -18,10 +18,9 @@
  */
 package org.apache.hadoop.hbase.util;
 
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
@@ -29,14 +28,17 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.junit.Assert.*;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 
 @Category(SmallTests.class)
 public class TestStealJobQueue {
 
   StealJobQueue<Integer> stealJobQueue;
-  BlockingQueue stealFromQueue;
+  BlockingQueue<Integer> stealFromQueue;
 
   @Before
   public void setup() {