You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/05/01 00:29:40 UTC

[hbase] branch branch-2 updated: HBASE-21883 Enhancements to Major Compaction tool

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 38c3178  HBASE-21883 Enhancements to Major Compaction tool
38c3178 is described below

commit 38c3178c18fd0b4535f17ce4a46c8145d4a781e5
Author: Thiruvel Thirumoolan <th...@oath.com>
AuthorDate: Tue Mar 26 19:43:02 2019 -0700

    HBASE-21883 Enhancements to Major Compaction tool
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../hbase/rsgroup/RSGroupMajorCompactionTTL.java   | 131 ++++++++++
 .../rsgroup/TestRSGroupMajorCompactionTTL.java     | 106 ++++++++
 .../util/compaction/MajorCompactionRequest.java    | 101 ++++----
 .../util/compaction/MajorCompactionTTLRequest.java | 109 ++++++++
 .../hbase/util/compaction/MajorCompactor.java      | 281 ++++++++++++++++-----
 .../hbase/util/compaction/MajorCompactorTTL.java   | 175 +++++++++++++
 .../compaction/TestMajorCompactionRequest.java     |  59 +++--
 .../compaction/TestMajorCompactionTTLRequest.java  | 100 ++++++++
 .../hbase/util/compaction/TestMajorCompactor.java  |   4 +-
 ...orCompactor.java => TestMajorCompactorTTL.java} |  93 ++++---
 10 files changed, 988 insertions(+), 171 deletions(-)

diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupMajorCompactionTTL.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupMajorCompactionTTL.java
new file mode 100644
index 0000000..d1b3751
--- /dev/null
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupMajorCompactionTTL.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rsgroup;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.compaction.MajorCompactorTTL;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
+
+/**
+ * This script takes an rsgroup as argument and compacts part/all of regions of that table
+ * based on the table's TTL.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class RSGroupMajorCompactionTTL extends MajorCompactorTTL {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RSGroupMajorCompactionTTL.class);
+
+  @VisibleForTesting
+  RSGroupMajorCompactionTTL() {
+    super();
+  }
+
+  public int compactTTLRegionsOnGroup(Configuration conf, String rsgroup, int concurrency,
+      long sleep, int numServers, int numRegions, boolean dryRun, boolean skipWait)
+      throws Exception {
+
+    Connection conn = ConnectionFactory.createConnection(conf);
+    RSGroupAdmin rsGroupAdmin = new RSGroupAdminClient(conn);
+
+    RSGroupInfo rsGroupInfo = rsGroupAdmin.getRSGroupInfo(rsgroup);
+    if (rsGroupInfo == null) {
+      LOG.error("Invalid rsgroup specified: " + rsgroup);
+      throw new IllegalArgumentException("Invalid rsgroup specified: " + rsgroup);
+    }
+
+    for (TableName tableName : rsGroupInfo.getTables()) {
+      int status = compactRegionsTTLOnTable(conf, tableName.getNameAsString(), concurrency, sleep,
+          numServers, numRegions, dryRun, skipWait);
+      if (status != 0) {
+        LOG.error("Failed to compact table: " + tableName);
+        return status;
+      }
+    }
+    return 0;
+  }
+
+  protected Options getOptions() {
+    Options options = getCommonOptions();
+
+    options.addOption(
+        Option.builder("rsgroup")
+            .required()
+            .desc("Tables of rsgroup to be compacted")
+            .hasArg()
+            .build()
+    );
+
+    return options;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Options options = getOptions();
+
+    final CommandLineParser cmdLineParser = new DefaultParser();
+    CommandLine commandLine;
+    try {
+      commandLine = cmdLineParser.parse(options, args);
+    } catch (ParseException parseException) {
+      System.out.println(
+          "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
+              + parseException);
+      printUsage(options);
+      return -1;
+    }
+    if (commandLine == null) {
+      System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
+      printUsage(options);
+      return -1;
+    }
+
+    String rsgroup = commandLine.getOptionValue("rsgroup");
+    int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
+    int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
+    int concurrency = Integer.parseInt(commandLine.getOptionValue("servers", "1"));
+    long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
+    boolean dryRun = commandLine.hasOption("dryRun");
+    boolean skipWait = commandLine.hasOption("skipWait");
+    Configuration conf = getConf();
+
+    return compactTTLRegionsOnGroup(conf, rsgroup, concurrency, sleep, numServers, numRegions,
+        dryRun, skipWait);
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(HBaseConfiguration.create(), new RSGroupMajorCompactionTTL(), args);
+  }
+}
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java
new file mode 100644
index 0000000..9b3dcbb
--- /dev/null
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rsgroup;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.util.compaction.TestMajorCompactorTTL;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+public class TestRSGroupMajorCompactionTTL extends TestMajorCompactorTTL {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRSGroupMajorCompactionTTL.class);
+
+  private final static int NUM_SLAVES_BASE = 6;
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    utility = new HBaseTestingUtility();
+    Configuration conf = utility.getConfiguration();
+    conf.set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class.getName());
+    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, RSGroupAdminEndpoint.class.getName());
+    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_SLAVES_BASE);
+    conf.setInt("hbase.hfile.compaction.discharger.interval", 10);
+    utility.startMiniCluster(NUM_SLAVES_BASE);
+    MiniHBaseCluster cluster = utility.getHBaseCluster();
+    final HMaster master = cluster.getMaster();
+
+    //wait for balancer to come online
+    utility.waitFor(60000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() {
+        return master.isInitialized() &&
+            ((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline();
+      }
+    });
+    admin = utility.getAdmin();
+  }
+
+  @After
+  @Override
+  public void tearDown() throws Exception {
+    utility.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testCompactingTables() throws Exception {
+    List<TableName> tableNames = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      tableNames.add(createTable(name.getMethodName() + "___" + i));
+    }
+
+    // Delay a bit, so we can set the table TTL to 5 seconds
+    Thread.sleep(10 * 1000);
+
+    for (TableName tableName : tableNames) {
+      int numberOfRegions = admin.getRegions(tableName).size();
+      int numHFiles = utility.getNumHFiles(tableName, FAMILY);
+      // we should have a table with more store files than we would before we major compacted.
+      assertTrue(numberOfRegions < numHFiles);
+      modifyTTL(tableName);
+    }
+
+    RSGroupMajorCompactionTTL compactor = new RSGroupMajorCompactionTTL();
+    compactor.compactTTLRegionsOnGroup(utility.getConfiguration(),
+        RSGroupInfo.DEFAULT_GROUP, 1, 200, -1, -1, false, false);
+
+    for (TableName tableName : tableNames) {
+      int numberOfRegions = admin.getRegions(tableName).size();
+      int numHFiles = utility.getNumHFiles(tableName, FAMILY);
+      assertEquals(numberOfRegions, numHFiles);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
index 51b2b9d..cf5fcd9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
@@ -44,25 +44,27 @@ class MajorCompactionRequest {
 
   private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class);
 
-  private final Configuration configuration;
-  private final RegionInfo region;
+  protected final Configuration configuration;
+  protected final RegionInfo region;
   private Set<String> stores;
-  private final long timestamp;
 
-  @VisibleForTesting
-  MajorCompactionRequest(Configuration configuration, RegionInfo region,
-      Set<String> stores, long timestamp) {
+  MajorCompactionRequest(Configuration configuration, RegionInfo region) {
     this.configuration = configuration;
     this.region = region;
+  }
+
+  @VisibleForTesting
+  MajorCompactionRequest(Configuration configuration, RegionInfo region,
+      Set<String> stores) {
+    this(configuration, region);
     this.stores = stores;
-    this.timestamp = timestamp;
   }
 
   static Optional<MajorCompactionRequest> newRequest(Configuration configuration, RegionInfo info,
       Set<String> stores, long timestamp) throws IOException {
     MajorCompactionRequest request =
-        new MajorCompactionRequest(configuration, info, stores, timestamp);
-    return request.createRequest(configuration, stores);
+        new MajorCompactionRequest(configuration, info, stores);
+    return request.createRequest(configuration, stores, timestamp);
   }
 
   RegionInfo getRegion() {
@@ -79,67 +81,80 @@ class MajorCompactionRequest {
 
   @VisibleForTesting
   Optional<MajorCompactionRequest> createRequest(Configuration configuration,
-      Set<String> stores) throws IOException {
-    Set<String> familiesToCompact = getStoresRequiringCompaction(stores);
+      Set<String> stores, long timestamp) throws IOException {
+    Set<String> familiesToCompact = getStoresRequiringCompaction(stores, timestamp);
     MajorCompactionRequest request = null;
     if (!familiesToCompact.isEmpty()) {
-      request = new MajorCompactionRequest(configuration, region, familiesToCompact, timestamp);
+      request = new MajorCompactionRequest(configuration, region, familiesToCompact);
     }
     return Optional.ofNullable(request);
   }
 
-  Set<String> getStoresRequiringCompaction(Set<String> requestedStores) throws IOException {
+  Set<String> getStoresRequiringCompaction(Set<String> requestedStores, long timestamp)
+      throws IOException {
     try(Connection connection = getConnection(configuration)) {
       HRegionFileSystem fileSystem = getFileSystem(connection);
       Set<String> familiesToCompact = Sets.newHashSet();
       for (String family : requestedStores) {
-        // do we have any store files?
-        Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
-        if (storeFiles == null) {
-          LOG.info("Excluding store: " + family + " for compaction for region:  " + fileSystem
-              .getRegionInfo().getEncodedName(), " has no store files");
-          continue;
-        }
-        // check for reference files
-        if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family)) {
+        if (shouldCFBeCompacted(fileSystem, family, timestamp)) {
           familiesToCompact.add(family);
-          LOG.info("Including store: " + family + " with: " + storeFiles.size()
-              + " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName());
-          continue;
-        }
-        // check store file timestamps
-        boolean includeStore = false;
-        for (StoreFileInfo storeFile : storeFiles) {
-          if (storeFile.getModificationTime() < timestamp) {
-            LOG.info("Including store: " + family + " with: " + storeFiles.size()
-                + " files for compaction for region: "
-                + fileSystem.getRegionInfo().getEncodedName());
-            familiesToCompact.add(family);
-            includeStore = true;
-            break;
-          }
-        }
-        if (!includeStore) {
-          LOG.info("Excluding store: " + family + " for compaction for region:  " + fileSystem
-              .getRegionInfo().getEncodedName(), " already compacted");
         }
       }
       return familiesToCompact;
     }
   }
 
+  boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts)
+      throws IOException {
+
+    // do we have any store files?
+    Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
+    if (storeFiles == null) {
+      LOG.info("Excluding store: " + family + " for compaction for region:  " + fileSystem
+          .getRegionInfo().getEncodedName(), " has no store files");
+      return false;
+    }
+    // check for reference files
+    if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family, ts)) {
+      LOG.info("Including store: " + family + " with: " + storeFiles.size()
+          + " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName());
+      return true;
+    }
+    // check store file timestamps
+    boolean includeStore = this.shouldIncludeStore(fileSystem, family, storeFiles, ts);
+    if (!includeStore) {
+      LOG.info("Excluding store: " + family + " for compaction for region:  " + fileSystem
+          .getRegionInfo().getEncodedName() + " already compacted");
+    }
+    return includeStore;
+  }
+
+  protected boolean shouldIncludeStore(HRegionFileSystem fileSystem, String family,
+      Collection<StoreFileInfo> storeFiles, long ts) throws IOException {
+
+    for (StoreFileInfo storeFile : storeFiles) {
+      if (storeFile.getModificationTime() < ts) {
+        LOG.info("Including store: " + family + " with: " + storeFiles.size()
+            + " files for compaction for region: "
+            + fileSystem.getRegionInfo().getEncodedName());
+        return true;
+      }
+    }
+    return false;
+  }
+
   @VisibleForTesting
   Connection getConnection(Configuration configuration) throws IOException {
     return ConnectionFactory.createConnection(configuration);
   }
 
-  private boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family)
+  protected boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family, long ts)
       throws IOException {
     List<Path> referenceFiles =
         getReferenceFilePaths(fileSystem.getFileSystem(), fileSystem.getStoreDir(family));
     for (Path referenceFile : referenceFiles) {
       FileStatus status = fileSystem.getFileSystem().getFileLinkStatus(referenceFile);
-      if (status.getModificationTime() < timestamp) {
+      if (status.getModificationTime() < ts) {
         LOG.info("Including store: " + family + " for compaction for region:  " + fileSystem
             .getRegionInfo().getEncodedName() + " (reference store files)");
         return true;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java
new file mode 100644
index 0000000..4d2b341
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.compaction;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+
+/**
+ * This request helps determine if a region has to be compacted based on table's TTL.
+ */
+@InterfaceAudience.Private
+public class MajorCompactionTTLRequest extends MajorCompactionRequest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionTTLRequest.class);
+
+  MajorCompactionTTLRequest(Configuration conf, RegionInfo region) {
+    super(conf, region);
+  }
+
+  static Optional<MajorCompactionRequest> newRequest(Configuration conf, RegionInfo info,
+      TableDescriptor htd) throws IOException {
+    MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(conf, info);
+    return request.createRequest(conf, htd);
+  }
+
+  @VisibleForTesting
+  private Optional<MajorCompactionRequest> createRequest(Configuration conf, TableDescriptor htd)
+      throws IOException {
+    Map<String, Long> familiesToCompact = getStoresRequiringCompaction(htd);
+    MajorCompactionRequest request = null;
+    if (!familiesToCompact.isEmpty()) {
+      LOG.debug("Compaction families for region: " + region + " CF: " + familiesToCompact.keySet());
+      request = new MajorCompactionTTLRequest(conf, region);
+    }
+    return Optional.ofNullable(request);
+  }
+
+  Map<String, Long> getStoresRequiringCompaction(TableDescriptor htd) throws IOException {
+    try(Connection connection = getConnection(configuration)) {
+      HRegionFileSystem fileSystem = getFileSystem(connection);
+      Map<String, Long> familyTTLMap = Maps.newHashMap();
+      for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) {
+        long ts = getColFamilyCutoffTime(descriptor);
+        // If the table's TTL is forever, lets not compact any of the regions.
+        if (ts > 0 && shouldCFBeCompacted(fileSystem, descriptor.getNameAsString(), ts)) {
+          familyTTLMap.put(descriptor.getNameAsString(), ts);
+        }
+      }
+      return familyTTLMap;
+    }
+  }
+
+  // If the CF has no TTL, return -1, else return the current time - TTL.
+  private long getColFamilyCutoffTime(ColumnFamilyDescriptor colDesc) {
+    if (colDesc.getTimeToLive() == HConstants.FOREVER) {
+      return -1;
+    }
+    return System.currentTimeMillis() - (colDesc.getTimeToLive() * 1000L);
+  }
+
+  @Override
+  protected boolean shouldIncludeStore(HRegionFileSystem fileSystem, String family,
+      Collection<StoreFileInfo> storeFiles, long ts) throws IOException {
+
+    for (StoreFileInfo storeFile : storeFiles) {
+      // Lets only compact when all files are older than TTL
+      if (storeFile.getModificationTime() >= ts) {
+        LOG.info("There is atleast one file in store: " + family + " file: " + storeFile.getPath()
+            + " with timestamp " + storeFile.getModificationTime()
+            + " for region: " + fileSystem.getRegionInfo().getEncodedName()
+            + " older than TTL: " + ts);
+        return false;
+      }
+    }
+    return true;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
index f7cac22..151b492 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
@@ -18,7 +18,10 @@ package org.apache.hadoop.hbase.util.compaction;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,7 +29,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
@@ -38,15 +43,19 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.CompactionState;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
 import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
 import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
@@ -57,18 +66,24 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
 
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-public class MajorCompactor {
+public class MajorCompactor extends Configured implements Tool {
 
   private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class);
-  private static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet();
+  protected static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet();
 
-  private final ClusterCompactionQueues clusterCompactionQueues;
-  private final long timestamp;
-  private final Set<String> storesToCompact;
-  private final ExecutorService executor;
-  private final long sleepForMs;
-  private final Connection connection;
-  private final TableName tableName;
+  protected ClusterCompactionQueues clusterCompactionQueues;
+  private long timestamp;
+  protected Set<String> storesToCompact;
+  protected ExecutorService executor;
+  protected long sleepForMs;
+  protected Connection connection;
+  protected TableName tableName;
+  private int numServers = -1;
+  private int numRegions = -1;
+  private boolean skipWait = false;
+
+  MajorCompactor() {
+  }
 
   public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact,
       int concurrency, long timestamp, long sleepForMs) throws IOException {
@@ -149,15 +164,83 @@ public class MajorCompactor {
     }
     LOG.info(
         "Initializing compaction queues for table:  " + tableName + " with cf: " + storesToCompact);
+
+    Map<ServerName, List<RegionInfo>> snRegionMap = getServerRegionsMap();
+    /*
+     * If numservers is specified, stop inspecting regions beyond the numservers, it will serve
+     * to throttle and won't end up scanning all the regions in the event there are not many
+     * regions to compact based on the criteria.
+     */
+    for (ServerName sn : getServersToCompact(snRegionMap.keySet())) {
+      List<RegionInfo> regions = snRegionMap.get(sn);
+      LOG.debug("Table: " + tableName + " Server: " + sn + " No of regions: " + regions.size());
+
+      /*
+       * If the tool is run periodically, then we could shuffle the regions and provide
+       * some random order to select regions. Helps if numregions is specified.
+       */
+      Collections.shuffle(regions);
+      int regionsToCompact = numRegions;
+      for (RegionInfo hri : regions) {
+        if (numRegions > 0 && regionsToCompact <= 0) {
+          LOG.debug("Reached region limit for server: " + sn);
+          break;
+        }
+
+        Optional<MajorCompactionRequest> request = getMajorCompactionRequest(hri);
+        if (request.isPresent()) {
+          LOG.debug("Adding region " + hri + " to queue " + sn + " for compaction");
+          clusterCompactionQueues.addToCompactionQueue(sn, request.get());
+          if (numRegions > 0) {
+            regionsToCompact--;
+          }
+        }
+      }
+    }
+  }
+
+  protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
+      throws IOException {
+    return MajorCompactionRequest.newRequest(connection.getConfiguration(), hri, storesToCompact,
+            timestamp);
+  }
+
+  private Collection<ServerName> getServersToCompact(Set<ServerName> snSet) {
+    if(numServers < 0 || snSet.size() <= numServers) {
+      return snSet;
+
+    } else {
+      List<ServerName> snList = Lists.newArrayList(snSet);
+      Collections.shuffle(snList);
+      return snList.subList(0, numServers);
+    }
+  }
+
+  private Map<ServerName, List<RegionInfo>> getServerRegionsMap() throws IOException {
+    Map<ServerName, List<RegionInfo>> snRegionMap = Maps.newHashMap();
     List<HRegionLocation> regionLocations =
         connection.getRegionLocator(tableName).getAllRegionLocations();
-    for (HRegionLocation location : regionLocations) {
-      Optional<MajorCompactionRequest> request = MajorCompactionRequest
-          .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact,
-              timestamp);
-      request.ifPresent(majorCompactionRequest -> clusterCompactionQueues
-          .addToCompactionQueue(location.getServerName(), majorCompactionRequest));
+    for (HRegionLocation regionLocation : regionLocations) {
+      ServerName sn = regionLocation.getServerName();
+      RegionInfo hri = regionLocation.getRegion();
+      if (!snRegionMap.containsKey(sn)) {
+        snRegionMap.put(sn, Lists.newArrayList());
+      }
+      snRegionMap.get(sn).add(hri);
     }
+    return snRegionMap;
+  }
+
+  public void setNumServers(int numServers) {
+    this.numServers = numServers;
+  }
+
+  public void setNumRegions(int numRegions) {
+    this.numRegions = numRegions;
+  }
+
+  public void setSkipWait(boolean skipWait) {
+    this.skipWait = skipWait;
   }
 
   class Compact implements Runnable {
@@ -190,52 +273,68 @@ public class MajorCompactor {
       try {
         // only make the request if the region is not already major compacting
         if (!isCompacting(request)) {
-          Set<String> stores = request.getStoresRequiringCompaction(storesToCompact);
+          Set<String> stores = getStoresRequiringCompaction(request);
           if (!stores.isEmpty()) {
             request.setStores(stores);
             for (String store : request.getStores()) {
-              admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(),
-                  Bytes.toBytes(store));
+              compactRegionOnServer(request, admin, store);
             }
           }
         }
-        while (isCompacting(request)) {
-          Thread.sleep(sleepForMs);
-          LOG.debug("Waiting for compaction to complete for region: " + request.getRegion()
-              .getEncodedName());
+
+        /*
+         * In some scenarios like compacting TTLed regions, the compaction itself won't take time
+         * and hence we can skip the wait. An external tool will also be triggered frequently and
+         * the next run can identify region movements and compact them.
+         */
+        if (!skipWait) {
+          while (isCompacting(request)) {
+            Thread.sleep(sleepForMs);
+            LOG.debug("Waiting for compaction to complete for region: " + request.getRegion()
+                .getEncodedName());
+          }
         }
       } finally {
-        // Make sure to wait for the CompactedFileDischarger chore to do its work
-        int waitForArchive = connection.getConfiguration()
-            .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
-        Thread.sleep(waitForArchive);
-        // check if compaction completed successfully, otherwise put that request back in the
-        // proper queue
-        Set<String> storesRequiringCompaction =
-            request.getStoresRequiringCompaction(storesToCompact);
-        if (!storesRequiringCompaction.isEmpty()) {
-          // this happens, when a region server is marked as dead, flushes a store file and
-          // the new regionserver doesn't pick it up because its accounted for in the WAL replay,
-          // thus you have more store files on the filesystem than the regionserver knows about.
-          boolean regionHasNotMoved = connection.getRegionLocator(tableName)
-              .getRegionLocation(request.getRegion().getStartKey()).getServerName()
-              .equals(serverName);
-          if (regionHasNotMoved) {
-            LOG.error("Not all store files were compacted, this may be due to the regionserver not "
-                + "being aware of all store files.  Will not reattempt compacting, " + request);
-            ERRORS.add(request);
+        if (!skipWait) {
+          // Make sure to wait for the CompactedFileDischarger chore to do its work
+          int waitForArchive = connection.getConfiguration()
+              .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
+          Thread.sleep(waitForArchive);
+          // check if compaction completed successfully, otherwise put that request back in the
+          // proper queue
+          Set<String> storesRequiringCompaction = getStoresRequiringCompaction(request);
+          if (!storesRequiringCompaction.isEmpty()) {
+            // this happens, when a region server is marked as dead, flushes a store file and
+            // the new regionserver doesn't pick it up because its accounted for in the WAL replay,
+            // thus you have more store files on the filesystem than the regionserver knows about.
+            boolean regionHasNotMoved = connection.getRegionLocator(tableName)
+                .getRegionLocation(request.getRegion().getStartKey()).getServerName()
+                .equals(serverName);
+            if (regionHasNotMoved) {
+              LOG.error(
+                  "Not all store files were compacted, this may be due to the regionserver not "
+                      + "being aware of all store files.  Will not reattempt compacting, "
+                      + request);
+              ERRORS.add(request);
+            } else {
+              request.setStores(storesRequiringCompaction);
+              clusterCompactionQueues.addToCompactionQueue(serverName, request);
+              LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction
+                  + " region: " + request.getRegion().getEncodedName());
+            }
           } else {
-            request.setStores(storesRequiringCompaction);
-            clusterCompactionQueues.addToCompactionQueue(serverName, request);
-            LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction
-                + " region: " + request.getRegion().getEncodedName());
+            LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName()
+                + " -> cf(s): " + request.getStores());
           }
-        } else {
-          LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName()
-              + " -> cf(s): " + request.getStores());
         }
       }
     }
+
+    private void compactRegionOnServer(MajorCompactionRequest request, Admin admin, String store)
+        throws IOException {
+      admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(),
+          Bytes.toBytes(store));
+    }
   }
 
   private boolean isCompacting(MajorCompactionRequest request) throws Exception {
@@ -263,22 +362,14 @@ public class MajorCompactor {
     }
   }
 
-  public static void main(String[] args) throws Exception {
+  protected Set<String> getStoresRequiringCompaction(MajorCompactionRequest request)
+      throws IOException {
+    return request.getStoresRequiringCompaction(storesToCompact, timestamp);
+  }
+
+  protected Options getCommonOptions() {
     Options options = new Options();
-    options.addOption(
-        Option.builder("table")
-            .required()
-            .desc("table name")
-            .hasArg()
-            .build()
-    );
-    options.addOption(
-        Option.builder("cf")
-            .optionalArg(true)
-            .desc("column families: comma separated eg: a,b,c")
-            .hasArg()
-            .build()
-    );
+
     options.addOption(
         Option.builder("servers")
             .required()
@@ -327,6 +418,49 @@ public class MajorCompactor {
             .build()
     );
 
+    options.addOption(
+        Option.builder("skipWait")
+            .desc("Skip waiting after triggering compaction.")
+            .hasArg(false)
+            .build()
+    );
+
+    options.addOption(
+        Option.builder("numservers")
+            .optionalArg(true)
+            .desc("Number of servers to compact in this run, defaults to all")
+            .hasArg()
+            .build()
+    );
+
+    options.addOption(
+        Option.builder("numregions")
+            .optionalArg(true)
+            .desc("Number of regions to compact per server, defaults to all")
+            .hasArg()
+            .build()
+    );
+    return options;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Options options = getCommonOptions();
+    options.addOption(
+        Option.builder("table")
+            .required()
+            .desc("table name")
+            .hasArg()
+            .build()
+    );
+    options.addOption(
+        Option.builder("cf")
+            .optionalArg(true)
+            .desc("column families: comma separated eg: a,b,c")
+            .hasArg()
+            .build()
+    );
+
     final CommandLineParser cmdLineParser = new DefaultParser();
     CommandLine commandLine = null;
     try {
@@ -336,12 +470,12 @@ public class MajorCompactor {
           "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
               + parseException);
       printUsage(options);
-      return;
+      return -1;
     }
     if (commandLine == null) {
       System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
       printUsage(options);
-      return;
+      return -1;
     }
     String tableName = commandLine.getOptionValue("table");
     String cf = commandLine.getOptionValue("cf", null);
@@ -350,8 +484,7 @@ public class MajorCompactor {
       Iterables.addAll(families, Splitter.on(",").split(cf));
     }
 
-
-    Configuration configuration = HBaseConfiguration.create();
+    Configuration configuration = getConf();
     int concurrency = Integer.parseInt(commandLine.getOptionValue("servers"));
     long minModTime = Long.parseLong(
         commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis())));
@@ -360,25 +493,35 @@ public class MajorCompactor {
     String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR));
     long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
 
+    int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
+    int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
+
     configuration.set(HConstants.HBASE_DIR, rootDir);
     configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);
 
     MajorCompactor compactor =
         new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency,
             minModTime, sleep);
+    compactor.setNumServers(numServers);
+    compactor.setNumRegions(numRegions);
+    compactor.setSkipWait(commandLine.hasOption("skipWait"));
 
     compactor.initializeWorkQueues();
     if (!commandLine.hasOption("dryRun")) {
       compactor.compactAllRegions();
     }
     compactor.shutdown();
+    return ERRORS.size();
   }
 
-  private static void printUsage(final Options options) {
+  protected static void printUsage(final Options options) {
     String header = "\nUsage instructions\n\n";
     String footer = "\n";
     HelpFormatter formatter = new HelpFormatter();
     formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true);
   }
 
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(HBaseConfiguration.create(), new MajorCompactor(), args);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java
new file mode 100644
index 0000000..321cbe0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.compaction;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
+
+/**
+ * This tool compacts a table's regions that are beyond it's TTL. It helps to save disk space and
+ * regions become empty as a result of compaction.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class MajorCompactorTTL extends MajorCompactor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MajorCompactorTTL .class);
+
+  private TableDescriptor htd;
+
+  @VisibleForTesting
+  public MajorCompactorTTL(Configuration conf, TableDescriptor htd, int concurrency,
+      long sleepForMs) throws IOException {
+    this.connection = ConnectionFactory.createConnection(conf);
+    this.htd = htd;
+    this.tableName = htd.getTableName();
+    this.storesToCompact = Sets.newHashSet(); // Empty set so all stores will be compacted
+    this.sleepForMs = sleepForMs;
+    this.executor = Executors.newFixedThreadPool(concurrency);
+    this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency);
+  }
+
+  protected MajorCompactorTTL() {
+    super();
+  }
+
+  @Override
+  protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
+      throws IOException {
+    return MajorCompactionTTLRequest.newRequest(connection.getConfiguration(), hri, htd);
+  }
+
+  @Override
+  protected Set<String> getStoresRequiringCompaction(MajorCompactionRequest request)
+      throws IOException {
+    return ((MajorCompactionTTLRequest)request).getStoresRequiringCompaction(htd).keySet();
+  }
+
+  public int compactRegionsTTLOnTable(Configuration conf, String table, int concurrency,
+      long sleep, int numServers, int numRegions, boolean dryRun, boolean skipWait)
+      throws Exception {
+
+    Connection conn = ConnectionFactory.createConnection(conf);
+    TableName tableName = TableName.valueOf(table);
+
+    TableDescriptor htd = conn.getAdmin().getDescriptor(tableName);
+    if (!doesAnyColFamilyHaveTTL(htd)) {
+      LOG.info("No TTL present for CF of table: " + tableName + ", skipping compaction");
+      return 0;
+    }
+
+    LOG.info("Major compacting table " + tableName + " based on TTL");
+    MajorCompactor compactor = new MajorCompactorTTL(conf, htd, concurrency, sleep);
+    compactor.setNumServers(numServers);
+    compactor.setNumRegions(numRegions);
+    compactor.setSkipWait(skipWait);
+
+    compactor.initializeWorkQueues();
+    if (!dryRun) {
+      compactor.compactAllRegions();
+    }
+    compactor.shutdown();
+    return ERRORS.size();
+  }
+
+  private boolean doesAnyColFamilyHaveTTL(TableDescriptor htd) {
+    for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) {
+      if (descriptor.getTimeToLive() != HConstants.FOREVER) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private Options getOptions() {
+    Options options = getCommonOptions();
+
+    options.addOption(
+        Option.builder("table")
+            .required()
+            .desc("table name")
+            .hasArg()
+            .build()
+    );
+
+    return options;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Options options = getOptions();
+
+    final CommandLineParser cmdLineParser = new DefaultParser();
+    CommandLine commandLine;
+    try {
+      commandLine = cmdLineParser.parse(options, args);
+    } catch (ParseException parseException) {
+      System.out.println(
+          "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
+              + parseException);
+      printUsage(options);
+      return -1;
+    }
+    if (commandLine == null) {
+      System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
+      printUsage(options);
+      return -1;
+    }
+
+    String table = commandLine.getOptionValue("table");
+    int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
+    int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
+    int concurrency = Integer.parseInt(commandLine.getOptionValue("servers", "1"));
+    long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
+    boolean dryRun = commandLine.hasOption("dryRun");
+    boolean skipWait = commandLine.hasOption("skipWait");
+
+    return compactRegionsTTLOnTable(HBaseConfiguration.create(), table, concurrency, sleep,
+        numServers, numRegions, dryRun, skipWait);
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(HBaseConfiguration.create(), new MajorCompactorTTL(), args);
+  }
+}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java
index adecd5c..c125c6e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java
@@ -17,11 +17,24 @@
  */
 package org.apache.hadoop.hbase.util.compaction;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -39,24 +52,13 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 
 @Category({SmallTests.class})
 public class TestMajorCompactionRequest {
@@ -64,10 +66,10 @@ public class TestMajorCompactionRequest {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestMajorCompactionRequest.class);
 
-  private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility();
-  private static final String FAMILY = "a";
-  private Path rootRegionDir;
-  private Path regionStoreDir;
+  protected static final HBaseTestingUtility UTILITY = new HBaseTestingUtility();
+  protected static final String FAMILY = "a";
+  protected Path rootRegionDir;
+  protected Path regionStoreDir;
 
   @Before public void setUp() throws Exception {
     rootRegionDir = UTILITY.getDataTestDirOnTestFS("TestMajorCompactionRequest");
@@ -77,15 +79,15 @@ public class TestMajorCompactionRequest {
   @Test public void testStoresNeedingCompaction() throws Exception {
     // store files older than timestamp
     List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10);
-    MajorCompactionRequest request = makeMockRequest(100, storeFiles, false);
+    MajorCompactionRequest request = makeMockRequest(storeFiles, false);
     Optional<MajorCompactionRequest> result =
-        request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY));
+        request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
     assertTrue(result.isPresent());
 
     // store files newer than timestamp
     storeFiles = mockStoreFiles(regionStoreDir, 5, 101);
-    request = makeMockRequest(100, storeFiles, false);
-    result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY));
+    request = makeMockRequest(storeFiles, false);
+    result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
     assertFalse(result.isPresent());
   }
 
@@ -106,16 +108,17 @@ public class TestMajorCompactionRequest {
     HRegionFileSystem fileSystem =
         mockFileSystem(region.getRegionInfo(), true, storeFiles, 50);
     MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration,
-        region.getRegionInfo(), Sets.newHashSet(FAMILY), 100));
+        region.getRegionInfo(), Sets.newHashSet(FAMILY)));
     doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration));
     doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class),
         any(Path.class));
     doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class));
-    Set<String> result = majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"));
+    Set<String> result =
+        majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"), 100);
     assertEquals(FAMILY, Iterables.getOnlyElement(result));
   }
 
-  private HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles,
+  protected HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles,
       List<StoreFileInfo> storeFiles) throws IOException {
     long timestamp = storeFiles.stream().findFirst().get().getModificationTime();
     return mockFileSystem(info, hasReferenceFiles, storeFiles, timestamp);
@@ -138,7 +141,7 @@ public class TestMajorCompactionRequest {
     return mockSystem;
   }
 
-  private List<StoreFileInfo> mockStoreFiles(Path regionStoreDir, int howMany, long timestamp)
+  protected List<StoreFileInfo> mockStoreFiles(Path regionStoreDir, int howMany, long timestamp)
       throws IOException {
     List<StoreFileInfo> infos = Lists.newArrayList();
     int i = 0;
@@ -153,14 +156,14 @@ public class TestMajorCompactionRequest {
     return infos;
   }
 
-  private MajorCompactionRequest makeMockRequest(long timestamp, List<StoreFileInfo> storeFiles,
+  private MajorCompactionRequest makeMockRequest(List<StoreFileInfo> storeFiles,
       boolean references) throws IOException {
     Configuration configuration = mock(Configuration.class);
     RegionInfo regionInfo = mock(RegionInfo.class);
     when(regionInfo.getEncodedName()).thenReturn("HBase");
     when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
     MajorCompactionRequest request =
-        new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"), timestamp);
+        new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"));
     MajorCompactionRequest spy = spy(request);
     HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles);
     doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java
new file mode 100644
index 0000000..f15b887
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.compaction;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@Category({SmallTests.class})
+public class TestMajorCompactionTTLRequest extends TestMajorCompactionRequest {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestMajorCompactionTTLRequest.class);
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    rootRegionDir = UTILITY.getDataTestDirOnTestFS("TestMajorCompactionTTLRequest");
+    regionStoreDir = new Path(rootRegionDir, FAMILY);
+  }
+
+  @Test
+  public void testStoresNeedingCompaction() throws Exception {
+    // store files older than timestamp 10
+    List<StoreFileInfo> storeFiles1 = mockStoreFiles(regionStoreDir, 5, 10);
+    // store files older than timestamp 100
+    List<StoreFileInfo> storeFiles2 = mockStoreFiles(regionStoreDir, 5, 100);
+    List<StoreFileInfo> storeFiles = Lists.newArrayList(storeFiles1);
+    storeFiles.addAll(storeFiles2);
+
+    MajorCompactionTTLRequest request = makeMockRequest(storeFiles);
+    // All files are <= 100, so region should not be compacted.
+    Optional<MajorCompactionRequest> result =
+        request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 10);
+    assertFalse(result.isPresent());
+
+    // All files are <= 100, so region should not be compacted yet.
+    result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
+    assertFalse(result.isPresent());
+
+    // All files are <= 100, so they should be considered for compaction
+    result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 101);
+    assertTrue(result.isPresent());
+  }
+
+  private MajorCompactionTTLRequest makeMockRequest(List<StoreFileInfo> storeFiles)
+      throws IOException {
+    Configuration configuration = mock(Configuration.class);
+    RegionInfo regionInfo = mock(RegionInfo.class);
+    when(regionInfo.getEncodedName()).thenReturn("HBase");
+    when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
+    MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(configuration, regionInfo);
+    MajorCompactionTTLRequest spy = spy(request);
+    HRegionFileSystem fileSystem = mockFileSystem(regionInfo, false, storeFiles);
+    doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
+    doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
+    return spy;
+  }
+}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java
index ccf0146..7ea80b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util.compaction;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -42,7 +43,8 @@ public class TestMajorCompactor {
       HBaseClassTestRule.forClass(TestMajorCompactor.class);
 
   public static final byte[] FAMILY = Bytes.toBytes("a");
-  private HBaseTestingUtility utility;
+  protected HBaseTestingUtility utility;
+  protected Admin admin;
 
   @Before public void setUp() throws Exception {
     utility = new HBaseTestingUtility();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactorTTL.java
similarity index 56%
copy from hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java
copy to hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactorTTL.java
index ccf0146..44abda6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactorTTL.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,70 +17,103 @@
  */
 package org.apache.hadoop.hbase.util.compaction;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 import org.junit.After;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-
+import org.junit.rules.TestName;
 
 @Category({ MiscTests.class, MediumTests.class })
-public class TestMajorCompactor {
+public class TestMajorCompactorTTL extends TestMajorCompactor {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestMajorCompactor.class);
+      HBaseClassTestRule.forClass(TestMajorCompactorTTL.class);
 
-  public static final byte[] FAMILY = Bytes.toBytes("a");
-  private HBaseTestingUtility utility;
+  @Rule
+  public TestName name = new TestName();
 
-  @Before public void setUp() throws Exception {
+  @Before
+  @Override
+  public void setUp() throws Exception {
     utility = new HBaseTestingUtility();
     utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10);
     utility.startMiniCluster();
+    admin = utility.getAdmin();
   }
 
-  @After public void tearDown() throws Exception {
+  @After
+  @Override
+  public void tearDown() throws Exception {
     utility.shutdownMiniCluster();
   }
 
-  @Test public void testCompactingATable() throws Exception {
-    TableName tableName = TableName.valueOf("TestMajorCompactor");
-    utility.createMultiRegionTable(tableName, FAMILY, 5);
-    utility.waitTableAvailable(tableName);
-    Connection connection = utility.getConnection();
-    Table table = connection.getTable(tableName);
-    // write data and flush multiple store files:
-    for (int i = 0; i < 5; i++) {
-      utility.loadRandomRows(table, FAMILY, 50, 100);
-      utility.flush(tableName);
-    }
-    table.close();
-    int numberOfRegions = utility.getAdmin().getRegions(tableName).size();
+  @Test
+  public void testCompactingATable() throws Exception {
+    TableName tableName = createTable(name.getMethodName());
+
+    // Delay a bit, so we can set the table TTL to 5 seconds
+    Thread.sleep(10 * 1000);
+
+    int numberOfRegions = admin.getRegions(tableName).size();
     int numHFiles = utility.getNumHFiles(tableName, FAMILY);
     // we should have a table with more store files than we would before we major compacted.
     assertTrue(numberOfRegions < numHFiles);
+    modifyTTL(tableName);
 
-    MajorCompactor compactor =
-        new MajorCompactor(utility.getConfiguration(), tableName,
-            Sets.newHashSet(Bytes.toString(FAMILY)), 1, System.currentTimeMillis(), 200);
+    MajorCompactorTTL compactor = new MajorCompactorTTL(utility.getConfiguration(),
+        admin.getDescriptor(tableName), 1, 200);
     compactor.initializeWorkQueues();
     compactor.compactAllRegions();
     compactor.shutdown();
 
     // verify that the store has been completely major compacted.
-    numberOfRegions = utility.getAdmin().getRegions(tableName).size();
+    numberOfRegions = admin.getRegions(tableName).size();
     numHFiles = utility.getNumHFiles(tableName, FAMILY);
-    assertEquals(numHFiles, numberOfRegions);
+    assertEquals(numberOfRegions, numHFiles);
+  }
+
+  protected void modifyTTL(TableName tableName) throws IOException, InterruptedException {
+    // Set the TTL to 5 secs, so all the files just written above will get cleaned up on compact.
+    admin.disableTable(tableName);
+    utility.waitTableDisabled(tableName.getName());
+    TableDescriptor descriptor = admin.getDescriptor(tableName);
+    ColumnFamilyDescriptor colDesc = descriptor.getColumnFamily(FAMILY);
+    ColumnFamilyDescriptorBuilder cFDB = ColumnFamilyDescriptorBuilder.newBuilder(colDesc);
+    cFDB.setTimeToLive(5);
+    admin.modifyColumnFamily(tableName, cFDB.build());
+    admin.enableTable(tableName);
+    utility.waitTableEnabled(tableName);
+  }
+
+  protected TableName createTable(String name) throws IOException, InterruptedException {
+    TableName tableName = TableName.valueOf(name);
+    utility.createMultiRegionTable(tableName, FAMILY, 5);
+    utility.waitTableAvailable(tableName);
+    Connection connection = utility.getConnection();
+    Table table = connection.getTable(tableName);
+    // write data and flush multiple store files:
+    for (int i = 0; i < 5; i++) {
+      utility.loadRandomRows(table, FAMILY, 50, 100);
+      utility.flush(tableName);
+    }
+    table.close();
+    return tableName;
   }
 }
\ No newline at end of file