You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/05/01 00:29:41 UTC
[hbase] branch master updated: HBASE-21883 Enhancements to Major
Compaction tool
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 32250e5 HBASE-21883 Enhancements to Major Compaction tool
32250e5 is described below
commit 32250e55ba7deff5a1b50f9b861a5077c6677956
Author: Thiruvel Thirumoolan <th...@oath.com>
AuthorDate: Tue Mar 26 19:43:02 2019 -0700
HBASE-21883 Enhancements to Major Compaction tool
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../hbase/rsgroup/RSGroupMajorCompactionTTL.java | 131 ++++++++++
.../rsgroup/TestRSGroupMajorCompactionTTL.java | 106 ++++++++
.../util/compaction/MajorCompactionRequest.java | 101 ++++----
.../util/compaction/MajorCompactionTTLRequest.java | 109 ++++++++
.../hbase/util/compaction/MajorCompactor.java | 281 ++++++++++++++++-----
.../hbase/util/compaction/MajorCompactorTTL.java | 175 +++++++++++++
.../compaction/TestMajorCompactionRequest.java | 59 +++--
.../compaction/TestMajorCompactionTTLRequest.java | 100 ++++++++
.../hbase/util/compaction/TestMajorCompactor.java | 4 +-
...orCompactor.java => TestMajorCompactorTTL.java} | 93 ++++---
10 files changed, 988 insertions(+), 171 deletions(-)
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupMajorCompactionTTL.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupMajorCompactionTTL.java
new file mode 100644
index 0000000..d1b3751
--- /dev/null
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupMajorCompactionTTL.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rsgroup;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.compaction.MajorCompactorTTL;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
+
+/**
+ * This script takes an rsgroup as argument and compacts part/all of regions of that table
+ * based on the table's TTL.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class RSGroupMajorCompactionTTL extends MajorCompactorTTL {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RSGroupMajorCompactionTTL.class);
+
+ @VisibleForTesting
+ RSGroupMajorCompactionTTL() {
+ super();
+ }
+
+ public int compactTTLRegionsOnGroup(Configuration conf, String rsgroup, int concurrency,
+ long sleep, int numServers, int numRegions, boolean dryRun, boolean skipWait)
+ throws Exception {
+
+ Connection conn = ConnectionFactory.createConnection(conf);
+ RSGroupAdmin rsGroupAdmin = new RSGroupAdminClient(conn);
+
+ RSGroupInfo rsGroupInfo = rsGroupAdmin.getRSGroupInfo(rsgroup);
+ if (rsGroupInfo == null) {
+ LOG.error("Invalid rsgroup specified: " + rsgroup);
+ throw new IllegalArgumentException("Invalid rsgroup specified: " + rsgroup);
+ }
+
+ for (TableName tableName : rsGroupInfo.getTables()) {
+ int status = compactRegionsTTLOnTable(conf, tableName.getNameAsString(), concurrency, sleep,
+ numServers, numRegions, dryRun, skipWait);
+ if (status != 0) {
+ LOG.error("Failed to compact table: " + tableName);
+ return status;
+ }
+ }
+ return 0;
+ }
+
+ protected Options getOptions() {
+ Options options = getCommonOptions();
+
+ options.addOption(
+ Option.builder("rsgroup")
+ .required()
+ .desc("Tables of rsgroup to be compacted")
+ .hasArg()
+ .build()
+ );
+
+ return options;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = getOptions();
+
+ final CommandLineParser cmdLineParser = new DefaultParser();
+ CommandLine commandLine;
+ try {
+ commandLine = cmdLineParser.parse(options, args);
+ } catch (ParseException parseException) {
+ System.out.println(
+ "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
+ + parseException);
+ printUsage(options);
+ return -1;
+ }
+ if (commandLine == null) {
+ System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
+ printUsage(options);
+ return -1;
+ }
+
+ String rsgroup = commandLine.getOptionValue("rsgroup");
+ int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
+ int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
+ int concurrency = Integer.parseInt(commandLine.getOptionValue("servers", "1"));
+ long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
+ boolean dryRun = commandLine.hasOption("dryRun");
+ boolean skipWait = commandLine.hasOption("skipWait");
+ Configuration conf = getConf();
+
+ return compactTTLRegionsOnGroup(conf, rsgroup, concurrency, sleep, numServers, numRegions,
+ dryRun, skipWait);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(HBaseConfiguration.create(), new RSGroupMajorCompactionTTL(), args);
+ }
+}
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java
new file mode 100644
index 0000000..9b3dcbb
--- /dev/null
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rsgroup;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.util.compaction.TestMajorCompactorTTL;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+public class TestRSGroupMajorCompactionTTL extends TestMajorCompactorTTL {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRSGroupMajorCompactionTTL.class);
+
+ private final static int NUM_SLAVES_BASE = 6;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ utility = new HBaseTestingUtility();
+ Configuration conf = utility.getConfiguration();
+ conf.set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class.getName());
+ conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, RSGroupAdminEndpoint.class.getName());
+ conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_SLAVES_BASE);
+ conf.setInt("hbase.hfile.compaction.discharger.interval", 10);
+ utility.startMiniCluster(NUM_SLAVES_BASE);
+ MiniHBaseCluster cluster = utility.getHBaseCluster();
+ final HMaster master = cluster.getMaster();
+
+ //wait for balancer to come online
+ utility.waitFor(60000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() {
+ return master.isInitialized() &&
+ ((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline();
+ }
+ });
+ admin = utility.getAdmin();
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ utility.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testCompactingTables() throws Exception {
+ List<TableName> tableNames = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ tableNames.add(createTable(name.getMethodName() + "___" + i));
+ }
+
+ // Delay a bit, so we can set the table TTL to 5 seconds
+ Thread.sleep(10 * 1000);
+
+ for (TableName tableName : tableNames) {
+ int numberOfRegions = admin.getRegions(tableName).size();
+ int numHFiles = utility.getNumHFiles(tableName, FAMILY);
+ // we should have a table with more store files than we would before we major compacted.
+ assertTrue(numberOfRegions < numHFiles);
+ modifyTTL(tableName);
+ }
+
+ RSGroupMajorCompactionTTL compactor = new RSGroupMajorCompactionTTL();
+ compactor.compactTTLRegionsOnGroup(utility.getConfiguration(),
+ RSGroupInfo.DEFAULT_GROUP, 1, 200, -1, -1, false, false);
+
+ for (TableName tableName : tableNames) {
+ int numberOfRegions = admin.getRegions(tableName).size();
+ int numHFiles = utility.getNumHFiles(tableName, FAMILY);
+ assertEquals(numberOfRegions, numHFiles);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
index 51b2b9d..cf5fcd9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
@@ -44,25 +44,27 @@ class MajorCompactionRequest {
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class);
- private final Configuration configuration;
- private final RegionInfo region;
+ protected final Configuration configuration;
+ protected final RegionInfo region;
private Set<String> stores;
- private final long timestamp;
- @VisibleForTesting
- MajorCompactionRequest(Configuration configuration, RegionInfo region,
- Set<String> stores, long timestamp) {
+ MajorCompactionRequest(Configuration configuration, RegionInfo region) {
this.configuration = configuration;
this.region = region;
+ }
+
+ @VisibleForTesting
+ MajorCompactionRequest(Configuration configuration, RegionInfo region,
+ Set<String> stores) {
+ this(configuration, region);
this.stores = stores;
- this.timestamp = timestamp;
}
static Optional<MajorCompactionRequest> newRequest(Configuration configuration, RegionInfo info,
Set<String> stores, long timestamp) throws IOException {
MajorCompactionRequest request =
- new MajorCompactionRequest(configuration, info, stores, timestamp);
- return request.createRequest(configuration, stores);
+ new MajorCompactionRequest(configuration, info, stores);
+ return request.createRequest(configuration, stores, timestamp);
}
RegionInfo getRegion() {
@@ -79,67 +81,80 @@ class MajorCompactionRequest {
@VisibleForTesting
Optional<MajorCompactionRequest> createRequest(Configuration configuration,
- Set<String> stores) throws IOException {
- Set<String> familiesToCompact = getStoresRequiringCompaction(stores);
+ Set<String> stores, long timestamp) throws IOException {
+ Set<String> familiesToCompact = getStoresRequiringCompaction(stores, timestamp);
MajorCompactionRequest request = null;
if (!familiesToCompact.isEmpty()) {
- request = new MajorCompactionRequest(configuration, region, familiesToCompact, timestamp);
+ request = new MajorCompactionRequest(configuration, region, familiesToCompact);
}
return Optional.ofNullable(request);
}
- Set<String> getStoresRequiringCompaction(Set<String> requestedStores) throws IOException {
+ Set<String> getStoresRequiringCompaction(Set<String> requestedStores, long timestamp)
+ throws IOException {
try(Connection connection = getConnection(configuration)) {
HRegionFileSystem fileSystem = getFileSystem(connection);
Set<String> familiesToCompact = Sets.newHashSet();
for (String family : requestedStores) {
- // do we have any store files?
- Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
- if (storeFiles == null) {
- LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
- .getRegionInfo().getEncodedName(), " has no store files");
- continue;
- }
- // check for reference files
- if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family)) {
+ if (shouldCFBeCompacted(fileSystem, family, timestamp)) {
familiesToCompact.add(family);
- LOG.info("Including store: " + family + " with: " + storeFiles.size()
- + " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName());
- continue;
- }
- // check store file timestamps
- boolean includeStore = false;
- for (StoreFileInfo storeFile : storeFiles) {
- if (storeFile.getModificationTime() < timestamp) {
- LOG.info("Including store: " + family + " with: " + storeFiles.size()
- + " files for compaction for region: "
- + fileSystem.getRegionInfo().getEncodedName());
- familiesToCompact.add(family);
- includeStore = true;
- break;
- }
- }
- if (!includeStore) {
- LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
- .getRegionInfo().getEncodedName(), " already compacted");
}
}
return familiesToCompact;
}
}
+ boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts)
+ throws IOException {
+
+ // do we have any store files?
+ Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
+ if (storeFiles == null) {
+ LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
+ .getRegionInfo().getEncodedName(), " has no store files");
+ return false;
+ }
+ // check for reference files
+ if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family, ts)) {
+ LOG.info("Including store: " + family + " with: " + storeFiles.size()
+ + " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName());
+ return true;
+ }
+ // check store file timestamps
+ boolean includeStore = this.shouldIncludeStore(fileSystem, family, storeFiles, ts);
+ if (!includeStore) {
+ LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
+ .getRegionInfo().getEncodedName() + " already compacted");
+ }
+ return includeStore;
+ }
+
+ protected boolean shouldIncludeStore(HRegionFileSystem fileSystem, String family,
+ Collection<StoreFileInfo> storeFiles, long ts) throws IOException {
+
+ for (StoreFileInfo storeFile : storeFiles) {
+ if (storeFile.getModificationTime() < ts) {
+ LOG.info("Including store: " + family + " with: " + storeFiles.size()
+ + " files for compaction for region: "
+ + fileSystem.getRegionInfo().getEncodedName());
+ return true;
+ }
+ }
+ return false;
+ }
+
@VisibleForTesting
Connection getConnection(Configuration configuration) throws IOException {
return ConnectionFactory.createConnection(configuration);
}
- private boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family)
+ protected boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family, long ts)
throws IOException {
List<Path> referenceFiles =
getReferenceFilePaths(fileSystem.getFileSystem(), fileSystem.getStoreDir(family));
for (Path referenceFile : referenceFiles) {
FileStatus status = fileSystem.getFileSystem().getFileLinkStatus(referenceFile);
- if (status.getModificationTime() < timestamp) {
+ if (status.getModificationTime() < ts) {
LOG.info("Including store: " + family + " for compaction for region: " + fileSystem
.getRegionInfo().getEncodedName() + " (reference store files)");
return true;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java
new file mode 100644
index 0000000..4d2b341
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.compaction;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+
+/**
+ * This request helps determine if a region has to be compacted based on table's TTL.
+ */
+@InterfaceAudience.Private
+public class MajorCompactionTTLRequest extends MajorCompactionRequest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionTTLRequest.class);
+
+ MajorCompactionTTLRequest(Configuration conf, RegionInfo region) {
+ super(conf, region);
+ }
+
+ static Optional<MajorCompactionRequest> newRequest(Configuration conf, RegionInfo info,
+ TableDescriptor htd) throws IOException {
+ MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(conf, info);
+ return request.createRequest(conf, htd);
+ }
+
+ @VisibleForTesting
+ private Optional<MajorCompactionRequest> createRequest(Configuration conf, TableDescriptor htd)
+ throws IOException {
+ Map<String, Long> familiesToCompact = getStoresRequiringCompaction(htd);
+ MajorCompactionRequest request = null;
+ if (!familiesToCompact.isEmpty()) {
+ LOG.debug("Compaction families for region: " + region + " CF: " + familiesToCompact.keySet());
+ request = new MajorCompactionTTLRequest(conf, region);
+ }
+ return Optional.ofNullable(request);
+ }
+
+ Map<String, Long> getStoresRequiringCompaction(TableDescriptor htd) throws IOException {
+ try(Connection connection = getConnection(configuration)) {
+ HRegionFileSystem fileSystem = getFileSystem(connection);
+ Map<String, Long> familyTTLMap = Maps.newHashMap();
+ for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) {
+ long ts = getColFamilyCutoffTime(descriptor);
+ // If the table's TTL is forever, lets not compact any of the regions.
+ if (ts > 0 && shouldCFBeCompacted(fileSystem, descriptor.getNameAsString(), ts)) {
+ familyTTLMap.put(descriptor.getNameAsString(), ts);
+ }
+ }
+ return familyTTLMap;
+ }
+ }
+
+ // If the CF has no TTL, return -1, else return the current time - TTL.
+ private long getColFamilyCutoffTime(ColumnFamilyDescriptor colDesc) {
+ if (colDesc.getTimeToLive() == HConstants.FOREVER) {
+ return -1;
+ }
+ return System.currentTimeMillis() - (colDesc.getTimeToLive() * 1000L);
+ }
+
+ @Override
+ protected boolean shouldIncludeStore(HRegionFileSystem fileSystem, String family,
+ Collection<StoreFileInfo> storeFiles, long ts) throws IOException {
+
+ for (StoreFileInfo storeFile : storeFiles) {
+ // Lets only compact when all files are older than TTL
+ if (storeFile.getModificationTime() >= ts) {
+ LOG.info("There is atleast one file in store: " + family + " file: " + storeFile.getPath()
+ + " with timestamp " + storeFile.getModificationTime()
+ + " for region: " + fileSystem.getRegionInfo().getEncodedName()
+ + " older than TTL: " + ts);
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
index f7cac22..151b492 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
@@ -18,7 +18,10 @@ package org.apache.hadoop.hbase.util.compaction;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,7 +29,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
@@ -38,15 +43,19 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
@@ -57,18 +66,24 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-public class MajorCompactor {
+public class MajorCompactor extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class);
- private static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet();
+ protected static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet();
- private final ClusterCompactionQueues clusterCompactionQueues;
- private final long timestamp;
- private final Set<String> storesToCompact;
- private final ExecutorService executor;
- private final long sleepForMs;
- private final Connection connection;
- private final TableName tableName;
+ protected ClusterCompactionQueues clusterCompactionQueues;
+ private long timestamp;
+ protected Set<String> storesToCompact;
+ protected ExecutorService executor;
+ protected long sleepForMs;
+ protected Connection connection;
+ protected TableName tableName;
+ private int numServers = -1;
+ private int numRegions = -1;
+ private boolean skipWait = false;
+
+ MajorCompactor() {
+ }
public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact,
int concurrency, long timestamp, long sleepForMs) throws IOException {
@@ -149,15 +164,83 @@ public class MajorCompactor {
}
LOG.info(
"Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact);
+
+ Map<ServerName, List<RegionInfo>> snRegionMap = getServerRegionsMap();
+ /*
+ * If numservers is specified, stop inspecting regions beyond the numservers, it will serve
+ * to throttle and won't end up scanning all the regions in the event there are not many
+ * regions to compact based on the criteria.
+ */
+ for (ServerName sn : getServersToCompact(snRegionMap.keySet())) {
+ List<RegionInfo> regions = snRegionMap.get(sn);
+ LOG.debug("Table: " + tableName + " Server: " + sn + " No of regions: " + regions.size());
+
+ /*
+ * If the tool is run periodically, then we could shuffle the regions and provide
+ * some random order to select regions. Helps if numregions is specified.
+ */
+ Collections.shuffle(regions);
+ int regionsToCompact = numRegions;
+ for (RegionInfo hri : regions) {
+ if (numRegions > 0 && regionsToCompact <= 0) {
+ LOG.debug("Reached region limit for server: " + sn);
+ break;
+ }
+
+ Optional<MajorCompactionRequest> request = getMajorCompactionRequest(hri);
+ if (request.isPresent()) {
+ LOG.debug("Adding region " + hri + " to queue " + sn + " for compaction");
+ clusterCompactionQueues.addToCompactionQueue(sn, request.get());
+ if (numRegions > 0) {
+ regionsToCompact--;
+ }
+ }
+ }
+ }
+ }
+
+ protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
+ throws IOException {
+ return MajorCompactionRequest.newRequest(connection.getConfiguration(), hri, storesToCompact,
+ timestamp);
+ }
+
+ private Collection<ServerName> getServersToCompact(Set<ServerName> snSet) {
+ if(numServers < 0 || snSet.size() <= numServers) {
+ return snSet;
+
+ } else {
+ List<ServerName> snList = Lists.newArrayList(snSet);
+ Collections.shuffle(snList);
+ return snList.subList(0, numServers);
+ }
+ }
+
+ private Map<ServerName, List<RegionInfo>> getServerRegionsMap() throws IOException {
+ Map<ServerName, List<RegionInfo>> snRegionMap = Maps.newHashMap();
List<HRegionLocation> regionLocations =
connection.getRegionLocator(tableName).getAllRegionLocations();
- for (HRegionLocation location : regionLocations) {
- Optional<MajorCompactionRequest> request = MajorCompactionRequest
- .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact,
- timestamp);
- request.ifPresent(majorCompactionRequest -> clusterCompactionQueues
- .addToCompactionQueue(location.getServerName(), majorCompactionRequest));
+ for (HRegionLocation regionLocation : regionLocations) {
+ ServerName sn = regionLocation.getServerName();
+ RegionInfo hri = regionLocation.getRegion();
+ if (!snRegionMap.containsKey(sn)) {
+ snRegionMap.put(sn, Lists.newArrayList());
+ }
+ snRegionMap.get(sn).add(hri);
}
+ return snRegionMap;
+ }
+
+ public void setNumServers(int numServers) {
+ this.numServers = numServers;
+ }
+
+ public void setNumRegions(int numRegions) {
+ this.numRegions = numRegions;
+ }
+
+ public void setSkipWait(boolean skipWait) {
+ this.skipWait = skipWait;
}
class Compact implements Runnable {
@@ -190,52 +273,68 @@ public class MajorCompactor {
try {
// only make the request if the region is not already major compacting
if (!isCompacting(request)) {
- Set<String> stores = request.getStoresRequiringCompaction(storesToCompact);
+ Set<String> stores = getStoresRequiringCompaction(request);
if (!stores.isEmpty()) {
request.setStores(stores);
for (String store : request.getStores()) {
- admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(),
- Bytes.toBytes(store));
+ compactRegionOnServer(request, admin, store);
}
}
}
- while (isCompacting(request)) {
- Thread.sleep(sleepForMs);
- LOG.debug("Waiting for compaction to complete for region: " + request.getRegion()
- .getEncodedName());
+
+ /*
+ * In some scenarios like compacting TTLed regions, the compaction itself won't take time
+ * and hence we can skip the wait. An external tool will also be triggered frequently and
+ * the next run can identify region movements and compact them.
+ */
+ if (!skipWait) {
+ while (isCompacting(request)) {
+ Thread.sleep(sleepForMs);
+ LOG.debug("Waiting for compaction to complete for region: " + request.getRegion()
+ .getEncodedName());
+ }
}
} finally {
- // Make sure to wait for the CompactedFileDischarger chore to do its work
- int waitForArchive = connection.getConfiguration()
- .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
- Thread.sleep(waitForArchive);
- // check if compaction completed successfully, otherwise put that request back in the
- // proper queue
- Set<String> storesRequiringCompaction =
- request.getStoresRequiringCompaction(storesToCompact);
- if (!storesRequiringCompaction.isEmpty()) {
- // this happens, when a region server is marked as dead, flushes a store file and
- // the new regionserver doesn't pick it up because its accounted for in the WAL replay,
- // thus you have more store files on the filesystem than the regionserver knows about.
- boolean regionHasNotMoved = connection.getRegionLocator(tableName)
- .getRegionLocation(request.getRegion().getStartKey()).getServerName()
- .equals(serverName);
- if (regionHasNotMoved) {
- LOG.error("Not all store files were compacted, this may be due to the regionserver not "
- + "being aware of all store files. Will not reattempt compacting, " + request);
- ERRORS.add(request);
+ if (!skipWait) {
+ // Make sure to wait for the CompactedFileDischarger chore to do its work
+ int waitForArchive = connection.getConfiguration()
+ .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
+ Thread.sleep(waitForArchive);
+ // check if compaction completed successfully, otherwise put that request back in the
+ // proper queue
+ Set<String> storesRequiringCompaction = getStoresRequiringCompaction(request);
+ if (!storesRequiringCompaction.isEmpty()) {
+ // this happens, when a region server is marked as dead, flushes a store file and
+ // the new regionserver doesn't pick it up because its accounted for in the WAL replay,
+ // thus you have more store files on the filesystem than the regionserver knows about.
+ boolean regionHasNotMoved = connection.getRegionLocator(tableName)
+ .getRegionLocation(request.getRegion().getStartKey()).getServerName()
+ .equals(serverName);
+ if (regionHasNotMoved) {
+ LOG.error(
+ "Not all store files were compacted, this may be due to the regionserver not "
+ + "being aware of all store files. Will not reattempt compacting, "
+ + request);
+ ERRORS.add(request);
+ } else {
+ request.setStores(storesRequiringCompaction);
+ clusterCompactionQueues.addToCompactionQueue(serverName, request);
+ LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction
+ + " region: " + request.getRegion().getEncodedName());
+ }
} else {
- request.setStores(storesRequiringCompaction);
- clusterCompactionQueues.addToCompactionQueue(serverName, request);
- LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction
- + " region: " + request.getRegion().getEncodedName());
+ LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName()
+ + " -> cf(s): " + request.getStores());
}
- } else {
- LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName()
- + " -> cf(s): " + request.getStores());
}
}
}
+
+ private void compactRegionOnServer(MajorCompactionRequest request, Admin admin, String store)
+ throws IOException {
+ admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(),
+ Bytes.toBytes(store));
+ }
}
private boolean isCompacting(MajorCompactionRequest request) throws Exception {
@@ -263,22 +362,14 @@ public class MajorCompactor {
}
}
- public static void main(String[] args) throws Exception {
+ protected Set<String> getStoresRequiringCompaction(MajorCompactionRequest request)
+ throws IOException {
+ return request.getStoresRequiringCompaction(storesToCompact, timestamp);
+ }
+
+ protected Options getCommonOptions() {
Options options = new Options();
- options.addOption(
- Option.builder("table")
- .required()
- .desc("table name")
- .hasArg()
- .build()
- );
- options.addOption(
- Option.builder("cf")
- .optionalArg(true)
- .desc("column families: comma separated eg: a,b,c")
- .hasArg()
- .build()
- );
+
options.addOption(
Option.builder("servers")
.required()
@@ -327,6 +418,49 @@ public class MajorCompactor {
.build()
);
+ options.addOption(
+ Option.builder("skipWait")
+ .desc("Skip waiting after triggering compaction.")
+ .hasArg(false)
+ .build()
+ );
+
+ options.addOption(
+ Option.builder("numservers")
+ .optionalArg(true)
+ .desc("Number of servers to compact in this run, defaults to all")
+ .hasArg()
+ .build()
+ );
+
+ options.addOption(
+ Option.builder("numregions")
+ .optionalArg(true)
+ .desc("Number of regions to compact per server, defaults to all")
+ .hasArg()
+ .build()
+ );
+ return options;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = getCommonOptions();
+ options.addOption(
+ Option.builder("table")
+ .required()
+ .desc("table name")
+ .hasArg()
+ .build()
+ );
+ options.addOption(
+ Option.builder("cf")
+ .optionalArg(true)
+ .desc("column families: comma separated eg: a,b,c")
+ .hasArg()
+ .build()
+ );
+
final CommandLineParser cmdLineParser = new DefaultParser();
CommandLine commandLine = null;
try {
@@ -336,12 +470,12 @@ public class MajorCompactor {
"ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
+ parseException);
printUsage(options);
- return;
+ return -1;
}
if (commandLine == null) {
System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
printUsage(options);
- return;
+ return -1;
}
String tableName = commandLine.getOptionValue("table");
String cf = commandLine.getOptionValue("cf", null);
@@ -350,8 +484,7 @@ public class MajorCompactor {
Iterables.addAll(families, Splitter.on(",").split(cf));
}
-
- Configuration configuration = HBaseConfiguration.create();
+ Configuration configuration = getConf();
int concurrency = Integer.parseInt(commandLine.getOptionValue("servers"));
long minModTime = Long.parseLong(
commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis())));
@@ -360,25 +493,35 @@ public class MajorCompactor {
String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR));
long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
+ int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
+ int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
+
configuration.set(HConstants.HBASE_DIR, rootDir);
configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);
MajorCompactor compactor =
new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency,
minModTime, sleep);
+ compactor.setNumServers(numServers);
+ compactor.setNumRegions(numRegions);
+ compactor.setSkipWait(commandLine.hasOption("skipWait"));
compactor.initializeWorkQueues();
if (!commandLine.hasOption("dryRun")) {
compactor.compactAllRegions();
}
compactor.shutdown();
+ return ERRORS.size();
}
- private static void printUsage(final Options options) {
+ protected static void printUsage(final Options options) {
String header = "\nUsage instructions\n\n";
String footer = "\n";
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true);
}
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(HBaseConfiguration.create(), new MajorCompactor(), args);
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java
new file mode 100644
index 0000000..321cbe0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.compaction;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
+
+/**
+ * This tool compacts a table's regions that are beyond it's TTL. It helps to save disk space and
+ * regions become empty as a result of compaction.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class MajorCompactorTTL extends MajorCompactor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MajorCompactorTTL .class);
+
+ private TableDescriptor htd;
+
+ @VisibleForTesting
+ public MajorCompactorTTL(Configuration conf, TableDescriptor htd, int concurrency,
+ long sleepForMs) throws IOException {
+ this.connection = ConnectionFactory.createConnection(conf);
+ this.htd = htd;
+ this.tableName = htd.getTableName();
+ this.storesToCompact = Sets.newHashSet(); // Empty set so all stores will be compacted
+ this.sleepForMs = sleepForMs;
+ this.executor = Executors.newFixedThreadPool(concurrency);
+ this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency);
+ }
+
+ protected MajorCompactorTTL() {
+ super();
+ }
+
+ @Override
+ protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
+ throws IOException {
+ return MajorCompactionTTLRequest.newRequest(connection.getConfiguration(), hri, htd);
+ }
+
+ @Override
+ protected Set<String> getStoresRequiringCompaction(MajorCompactionRequest request)
+ throws IOException {
+ return ((MajorCompactionTTLRequest)request).getStoresRequiringCompaction(htd).keySet();
+ }
+
+ public int compactRegionsTTLOnTable(Configuration conf, String table, int concurrency,
+ long sleep, int numServers, int numRegions, boolean dryRun, boolean skipWait)
+ throws Exception {
+
+ Connection conn = ConnectionFactory.createConnection(conf);
+ TableName tableName = TableName.valueOf(table);
+
+ TableDescriptor htd = conn.getAdmin().getDescriptor(tableName);
+ if (!doesAnyColFamilyHaveTTL(htd)) {
+ LOG.info("No TTL present for CF of table: " + tableName + ", skipping compaction");
+ return 0;
+ }
+
+ LOG.info("Major compacting table " + tableName + " based on TTL");
+ MajorCompactor compactor = new MajorCompactorTTL(conf, htd, concurrency, sleep);
+ compactor.setNumServers(numServers);
+ compactor.setNumRegions(numRegions);
+ compactor.setSkipWait(skipWait);
+
+ compactor.initializeWorkQueues();
+ if (!dryRun) {
+ compactor.compactAllRegions();
+ }
+ compactor.shutdown();
+ return ERRORS.size();
+ }
+
+ private boolean doesAnyColFamilyHaveTTL(TableDescriptor htd) {
+ for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) {
+ if (descriptor.getTimeToLive() != HConstants.FOREVER) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private Options getOptions() {
+ Options options = getCommonOptions();
+
+ options.addOption(
+ Option.builder("table")
+ .required()
+ .desc("table name")
+ .hasArg()
+ .build()
+ );
+
+ return options;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = getOptions();
+
+ final CommandLineParser cmdLineParser = new DefaultParser();
+ CommandLine commandLine;
+ try {
+ commandLine = cmdLineParser.parse(options, args);
+ } catch (ParseException parseException) {
+ System.out.println(
+ "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
+ + parseException);
+ printUsage(options);
+ return -1;
+ }
+ if (commandLine == null) {
+ System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
+ printUsage(options);
+ return -1;
+ }
+
+ String table = commandLine.getOptionValue("table");
+ int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
+ int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
+ int concurrency = Integer.parseInt(commandLine.getOptionValue("servers", "1"));
+ long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
+ boolean dryRun = commandLine.hasOption("dryRun");
+ boolean skipWait = commandLine.hasOption("skipWait");
+
+ return compactRegionsTTLOnTable(HBaseConfiguration.create(), table, concurrency, sleep,
+ numServers, numRegions, dryRun, skipWait);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(HBaseConfiguration.create(), new MajorCompactorTTL(), args);
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java
index adecd5c..c125c6e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java
@@ -17,11 +17,24 @@
*/
package org.apache.hadoop.hbase.util.compaction;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -39,24 +52,13 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
@Category({SmallTests.class})
public class TestMajorCompactionRequest {
@@ -64,10 +66,10 @@ public class TestMajorCompactionRequest {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMajorCompactionRequest.class);
- private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility();
- private static final String FAMILY = "a";
- private Path rootRegionDir;
- private Path regionStoreDir;
+ protected static final HBaseTestingUtility UTILITY = new HBaseTestingUtility();
+ protected static final String FAMILY = "a";
+ protected Path rootRegionDir;
+ protected Path regionStoreDir;
@Before public void setUp() throws Exception {
rootRegionDir = UTILITY.getDataTestDirOnTestFS("TestMajorCompactionRequest");
@@ -77,15 +79,15 @@ public class TestMajorCompactionRequest {
@Test public void testStoresNeedingCompaction() throws Exception {
// store files older than timestamp
List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10);
- MajorCompactionRequest request = makeMockRequest(100, storeFiles, false);
+ MajorCompactionRequest request = makeMockRequest(storeFiles, false);
Optional<MajorCompactionRequest> result =
- request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY));
+ request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
assertTrue(result.isPresent());
// store files newer than timestamp
storeFiles = mockStoreFiles(regionStoreDir, 5, 101);
- request = makeMockRequest(100, storeFiles, false);
- result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY));
+ request = makeMockRequest(storeFiles, false);
+ result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
assertFalse(result.isPresent());
}
@@ -106,16 +108,17 @@ public class TestMajorCompactionRequest {
HRegionFileSystem fileSystem =
mockFileSystem(region.getRegionInfo(), true, storeFiles, 50);
MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration,
- region.getRegionInfo(), Sets.newHashSet(FAMILY), 100));
+ region.getRegionInfo(), Sets.newHashSet(FAMILY)));
doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration));
doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class),
any(Path.class));
doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class));
- Set<String> result = majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"));
+ Set<String> result =
+ majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"), 100);
assertEquals(FAMILY, Iterables.getOnlyElement(result));
}
- private HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles,
+ protected HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles,
List<StoreFileInfo> storeFiles) throws IOException {
long timestamp = storeFiles.stream().findFirst().get().getModificationTime();
return mockFileSystem(info, hasReferenceFiles, storeFiles, timestamp);
@@ -138,7 +141,7 @@ public class TestMajorCompactionRequest {
return mockSystem;
}
- private List<StoreFileInfo> mockStoreFiles(Path regionStoreDir, int howMany, long timestamp)
+ protected List<StoreFileInfo> mockStoreFiles(Path regionStoreDir, int howMany, long timestamp)
throws IOException {
List<StoreFileInfo> infos = Lists.newArrayList();
int i = 0;
@@ -153,14 +156,14 @@ public class TestMajorCompactionRequest {
return infos;
}
- private MajorCompactionRequest makeMockRequest(long timestamp, List<StoreFileInfo> storeFiles,
+ private MajorCompactionRequest makeMockRequest(List<StoreFileInfo> storeFiles,
boolean references) throws IOException {
Configuration configuration = mock(Configuration.class);
RegionInfo regionInfo = mock(RegionInfo.class);
when(regionInfo.getEncodedName()).thenReturn("HBase");
when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
MajorCompactionRequest request =
- new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"), timestamp);
+ new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"));
MajorCompactionRequest spy = spy(request);
HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles);
doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java
new file mode 100644
index 0000000..f15b887
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.compaction;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@Category({SmallTests.class})
+public class TestMajorCompactionTTLRequest extends TestMajorCompactionRequest {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMajorCompactionTTLRequest.class);
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ rootRegionDir = UTILITY.getDataTestDirOnTestFS("TestMajorCompactionTTLRequest");
+ regionStoreDir = new Path(rootRegionDir, FAMILY);
+ }
+
+ @Test
+ public void testStoresNeedingCompaction() throws Exception {
+ // store files older than timestamp 10
+ List<StoreFileInfo> storeFiles1 = mockStoreFiles(regionStoreDir, 5, 10);
+ // store files older than timestamp 100
+ List<StoreFileInfo> storeFiles2 = mockStoreFiles(regionStoreDir, 5, 100);
+ List<StoreFileInfo> storeFiles = Lists.newArrayList(storeFiles1);
+ storeFiles.addAll(storeFiles2);
+
+ MajorCompactionTTLRequest request = makeMockRequest(storeFiles);
+ // All files are <= 100, so region should not be compacted.
+ Optional<MajorCompactionRequest> result =
+ request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 10);
+ assertFalse(result.isPresent());
+
+ // All files are <= 100, so region should not be compacted yet.
+ result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
+ assertFalse(result.isPresent());
+
+ // All files are <= 100, so they should be considered for compaction
+ result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 101);
+ assertTrue(result.isPresent());
+ }
+
+ private MajorCompactionTTLRequest makeMockRequest(List<StoreFileInfo> storeFiles)
+ throws IOException {
+ Configuration configuration = mock(Configuration.class);
+ RegionInfo regionInfo = mock(RegionInfo.class);
+ when(regionInfo.getEncodedName()).thenReturn("HBase");
+ when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
+ MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(configuration, regionInfo);
+ MajorCompactionTTLRequest spy = spy(request);
+ HRegionFileSystem fileSystem = mockFileSystem(regionInfo, false, storeFiles);
+ doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
+ doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
+ return spy;
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java
index ccf0146..7ea80b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util.compaction;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -42,7 +43,8 @@ public class TestMajorCompactor {
HBaseClassTestRule.forClass(TestMajorCompactor.class);
public static final byte[] FAMILY = Bytes.toBytes("a");
- private HBaseTestingUtility utility;
+ protected HBaseTestingUtility utility;
+ protected Admin admin;
@Before public void setUp() throws Exception {
utility = new HBaseTestingUtility();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactorTTL.java
similarity index 56%
copy from hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java
copy to hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactorTTL.java
index ccf0146..44abda6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactorTTL.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,70 +17,103 @@
*/
package org.apache.hadoop.hbase.util.compaction;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.junit.After;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-
+import org.junit.rules.TestName;
@Category({ MiscTests.class, MediumTests.class })
-public class TestMajorCompactor {
+public class TestMajorCompactorTTL extends TestMajorCompactor {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMajorCompactor.class);
+ HBaseClassTestRule.forClass(TestMajorCompactorTTL.class);
- public static final byte[] FAMILY = Bytes.toBytes("a");
- private HBaseTestingUtility utility;
+ @Rule
+ public TestName name = new TestName();
- @Before public void setUp() throws Exception {
+ @Before
+ @Override
+ public void setUp() throws Exception {
utility = new HBaseTestingUtility();
utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10);
utility.startMiniCluster();
+ admin = utility.getAdmin();
}
- @After public void tearDown() throws Exception {
+ @After
+ @Override
+ public void tearDown() throws Exception {
utility.shutdownMiniCluster();
}
- @Test public void testCompactingATable() throws Exception {
- TableName tableName = TableName.valueOf("TestMajorCompactor");
- utility.createMultiRegionTable(tableName, FAMILY, 5);
- utility.waitTableAvailable(tableName);
- Connection connection = utility.getConnection();
- Table table = connection.getTable(tableName);
- // write data and flush multiple store files:
- for (int i = 0; i < 5; i++) {
- utility.loadRandomRows(table, FAMILY, 50, 100);
- utility.flush(tableName);
- }
- table.close();
- int numberOfRegions = utility.getAdmin().getRegions(tableName).size();
+ @Test
+ public void testCompactingATable() throws Exception {
+ TableName tableName = createTable(name.getMethodName());
+
+ // Delay a bit, so we can set the table TTL to 5 seconds
+ Thread.sleep(10 * 1000);
+
+ int numberOfRegions = admin.getRegions(tableName).size();
int numHFiles = utility.getNumHFiles(tableName, FAMILY);
// we should have a table with more store files than we would before we major compacted.
assertTrue(numberOfRegions < numHFiles);
+ modifyTTL(tableName);
- MajorCompactor compactor =
- new MajorCompactor(utility.getConfiguration(), tableName,
- Sets.newHashSet(Bytes.toString(FAMILY)), 1, System.currentTimeMillis(), 200);
+ MajorCompactorTTL compactor = new MajorCompactorTTL(utility.getConfiguration(),
+ admin.getDescriptor(tableName), 1, 200);
compactor.initializeWorkQueues();
compactor.compactAllRegions();
compactor.shutdown();
// verify that the store has been completely major compacted.
- numberOfRegions = utility.getAdmin().getRegions(tableName).size();
+ numberOfRegions = admin.getRegions(tableName).size();
numHFiles = utility.getNumHFiles(tableName, FAMILY);
- assertEquals(numHFiles, numberOfRegions);
+ assertEquals(numberOfRegions, numHFiles);
+ }
+
+ protected void modifyTTL(TableName tableName) throws IOException, InterruptedException {
+ // Set the TTL to 5 secs, so all the files just written above will get cleaned up on compact.
+ admin.disableTable(tableName);
+ utility.waitTableDisabled(tableName.getName());
+ TableDescriptor descriptor = admin.getDescriptor(tableName);
+ ColumnFamilyDescriptor colDesc = descriptor.getColumnFamily(FAMILY);
+ ColumnFamilyDescriptorBuilder cFDB = ColumnFamilyDescriptorBuilder.newBuilder(colDesc);
+ cFDB.setTimeToLive(5);
+ admin.modifyColumnFamily(tableName, cFDB.build());
+ admin.enableTable(tableName);
+ utility.waitTableEnabled(tableName);
+ }
+
+ protected TableName createTable(String name) throws IOException, InterruptedException {
+ TableName tableName = TableName.valueOf(name);
+ utility.createMultiRegionTable(tableName, FAMILY, 5);
+ utility.waitTableAvailable(tableName);
+ Connection connection = utility.getConnection();
+ Table table = connection.getTable(tableName);
+ // write data and flush multiple store files:
+ for (int i = 0; i < 5; i++) {
+ utility.loadRandomRows(table, FAMILY, 50, 100);
+ utility.flush(tableName);
+ }
+ table.close();
+ return tableName;
}
}
\ No newline at end of file