You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2018/01/31 18:18:41 UTC
hbase git commit: HBASE-19528 - Major Compaction Tool
Repository: hbase
Updated Branches:
refs/heads/master 7c318cead -> 4b3b627ab
HBASE-19528 - Major Compaction Tool
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4b3b627a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4b3b627a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4b3b627a
Branch: refs/heads/master
Commit: 4b3b627abe34f8426fddd914e941d12f79da0752
Parents: 7c318ce
Author: Rahul Gidwani <ch...@apache.org>
Authored: Thu Jan 25 12:47:50 2018 -0800
Committer: Rahul Gidwani <ch...@apache.org>
Committed: Wed Jan 31 10:18:03 2018 -0800
----------------------------------------------------------------------
.../compaction/ClusterCompactionQueues.java | 137 +++++++
.../util/compaction/MajorCompactionRequest.java | 171 +++++++++
.../hbase/util/compaction/MajorCompactor.java | 379 +++++++++++++++++++
.../compaction/MajorCompactionRequestTest.java | 166 ++++++++
.../util/compaction/MajorCompactorTest.java | 81 ++++
5 files changed, 934 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java
new file mode 100644
index 0000000..c0d34d9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.compaction;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@InterfaceAudience.Private
+class ClusterCompactionQueues {
+
+ private final Map<ServerName, List<MajorCompactionRequest>> compactionQueues;
+ private final Set<ServerName> compactingServers;
+ private final ReadWriteLock lock;
+ private final int concurrentServers;
+
+ ClusterCompactionQueues(int concurrentServers) {
+ this.concurrentServers = concurrentServers;
+
+ this.compactionQueues = Maps.newHashMap();
+ this.lock = new ReentrantReadWriteLock();
+ this.compactingServers = Sets.newHashSet();
+ }
+
+ void addToCompactionQueue(ServerName serverName, MajorCompactionRequest info) {
+ this.lock.writeLock().lock();
+ try {
+ List<MajorCompactionRequest> result = this.compactionQueues.get(serverName);
+ if (result == null) {
+ result = Lists.newArrayList();
+ compactionQueues.put(serverName, result);
+ }
+ result.add(info);
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+
+ boolean hasWorkItems() {
+ lock.readLock().lock();
+ try {
+ return !this.compactionQueues.values().stream().allMatch(List::isEmpty);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ int getCompactionRequestsLeftToFinish() {
+ lock.readLock().lock();
+ try {
+ int size = 0;
+ for (List<MajorCompactionRequest> queue : compactionQueues.values()) {
+ size += queue.size();
+ }
+ return size;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @VisibleForTesting List<MajorCompactionRequest> getQueue(ServerName serverName) {
+ lock.readLock().lock();
+ try {
+ return compactionQueues.get(serverName);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ MajorCompactionRequest reserveForCompaction(ServerName serverName) {
+ lock.writeLock().lock();
+ try {
+ if (!compactionQueues.get(serverName).isEmpty()) {
+ compactingServers.add(serverName);
+ return compactionQueues.get(serverName).remove(0);
+ }
+ return null;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ void releaseCompaction(ServerName serverName) {
+ lock.writeLock().lock();
+ try {
+ compactingServers.remove(serverName);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ boolean atCapacity() {
+ lock.readLock().lock();
+ try {
+ return compactingServers.size() >= concurrentServers;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ Optional<ServerName> getLargestQueueFromServersNotCompacting() {
+ lock.readLock().lock();
+ try {
+ return compactionQueues.entrySet().stream()
+ .filter(entry -> !compactingServers.contains(entry.getKey()))
+ .max(Map.Entry.comparingByValue(
+ (o1, o2) -> Integer.compare(o1.size(), o2.size()))).map(Map.Entry::getKey);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
new file mode 100644
index 0000000..51b2b9d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.compaction;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@InterfaceAudience.Private
+class MajorCompactionRequest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class);
+
+ private final Configuration configuration;
+ private final RegionInfo region;
+ private Set<String> stores;
+ private final long timestamp;
+
+ @VisibleForTesting
+ MajorCompactionRequest(Configuration configuration, RegionInfo region,
+ Set<String> stores, long timestamp) {
+ this.configuration = configuration;
+ this.region = region;
+ this.stores = stores;
+ this.timestamp = timestamp;
+ }
+
+ static Optional<MajorCompactionRequest> newRequest(Configuration configuration, RegionInfo info,
+ Set<String> stores, long timestamp) throws IOException {
+ MajorCompactionRequest request =
+ new MajorCompactionRequest(configuration, info, stores, timestamp);
+ return request.createRequest(configuration, stores);
+ }
+
+ RegionInfo getRegion() {
+ return region;
+ }
+
+ Set<String> getStores() {
+ return stores;
+ }
+
+ void setStores(Set<String> stores) {
+ this.stores = stores;
+ }
+
+ @VisibleForTesting
+ Optional<MajorCompactionRequest> createRequest(Configuration configuration,
+ Set<String> stores) throws IOException {
+ Set<String> familiesToCompact = getStoresRequiringCompaction(stores);
+ MajorCompactionRequest request = null;
+ if (!familiesToCompact.isEmpty()) {
+ request = new MajorCompactionRequest(configuration, region, familiesToCompact, timestamp);
+ }
+ return Optional.ofNullable(request);
+ }
+
+ Set<String> getStoresRequiringCompaction(Set<String> requestedStores) throws IOException {
+ try(Connection connection = getConnection(configuration)) {
+ HRegionFileSystem fileSystem = getFileSystem(connection);
+ Set<String> familiesToCompact = Sets.newHashSet();
+ for (String family : requestedStores) {
+ // do we have any store files?
+ Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
+ if (storeFiles == null) {
+ LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
+ .getRegionInfo().getEncodedName(), " has no store files");
+ continue;
+ }
+ // check for reference files
+ if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family)) {
+ familiesToCompact.add(family);
+ LOG.info("Including store: " + family + " with: " + storeFiles.size()
+ + " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName());
+ continue;
+ }
+ // check store file timestamps
+ boolean includeStore = false;
+ for (StoreFileInfo storeFile : storeFiles) {
+ if (storeFile.getModificationTime() < timestamp) {
+ LOG.info("Including store: " + family + " with: " + storeFiles.size()
+ + " files for compaction for region: "
+ + fileSystem.getRegionInfo().getEncodedName());
+ familiesToCompact.add(family);
+ includeStore = true;
+ break;
+ }
+ }
+ if (!includeStore) {
+ LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
+ .getRegionInfo().getEncodedName(), " already compacted");
+ }
+ }
+ return familiesToCompact;
+ }
+ }
+
+ @VisibleForTesting
+ Connection getConnection(Configuration configuration) throws IOException {
+ return ConnectionFactory.createConnection(configuration);
+ }
+
+ private boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family)
+ throws IOException {
+ List<Path> referenceFiles =
+ getReferenceFilePaths(fileSystem.getFileSystem(), fileSystem.getStoreDir(family));
+ for (Path referenceFile : referenceFiles) {
+ FileStatus status = fileSystem.getFileSystem().getFileLinkStatus(referenceFile);
+ if (status.getModificationTime() < timestamp) {
+ LOG.info("Including store: " + family + " for compaction for region: " + fileSystem
+ .getRegionInfo().getEncodedName() + " (reference store files)");
+ return true;
+ }
+ }
+ return false;
+
+ }
+
+ @VisibleForTesting
+ List<Path> getReferenceFilePaths(FileSystem fileSystem, Path familyDir)
+ throws IOException {
+ return FSUtils.getReferenceFilePaths(fileSystem, familyDir);
+ }
+
+ @VisibleForTesting
+ HRegionFileSystem getFileSystem(Connection connection) throws IOException {
+ Admin admin = connection.getAdmin();
+ return HRegionFileSystem.openRegionFromFileSystem(admin.getConfiguration(),
+ FSUtils.getCurrentFileSystem(admin.getConfiguration()),
+ FSUtils.getTableDir(FSUtils.getRootDir(admin.getConfiguration()), region.getTable()),
+ region, true);
+ }
+
+ @Override
+ public String toString() {
+ return "region: " + region.getEncodedName() + " store(s): " + stores;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
new file mode 100644
index 0000000..c3372bb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.compaction;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.CompactionState;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class MajorCompactor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class);
+ private static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet();
+
+ private final ClusterCompactionQueues clusterCompactionQueues;
+ private final long timestamp;
+ private final Set<String> storesToCompact;
+ private final ExecutorService executor;
+ private final long sleepForMs;
+ private final Connection connection;
+ private final TableName tableName;
+
+ public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact,
+ int concurrency, long timestamp, long sleepForMs) throws IOException {
+ this.connection = ConnectionFactory.createConnection(conf);
+ this.tableName = tableName;
+ this.timestamp = timestamp;
+ this.storesToCompact = storesToCompact;
+ this.executor = Executors.newFixedThreadPool(concurrency);
+ this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency);
+ this.sleepForMs = sleepForMs;
+ }
+
+ public void compactAllRegions() throws Exception {
+ List<Future<?>> futures = Lists.newArrayList();
+ while (clusterCompactionQueues.hasWorkItems() || !futuresComplete(futures)) {
+ while (clusterCompactionQueues.atCapacity()) {
+ LOG.debug("Waiting for servers to complete Compactions");
+ Thread.sleep(sleepForMs);
+ }
+ Optional<ServerName> serverToProcess =
+ clusterCompactionQueues.getLargestQueueFromServersNotCompacting();
+ if (serverToProcess.isPresent() && clusterCompactionQueues.hasWorkItems()) {
+ ServerName serverName = serverToProcess.get();
+ // check to see if the region has moved... if so we have to enqueue it again with
+ // the proper serverName
+ MajorCompactionRequest request = clusterCompactionQueues.reserveForCompaction(serverName);
+
+ ServerName currentServer = connection.getRegionLocator(tableName)
+ .getRegionLocation(request.getRegion().getStartKey()).getServerName();
+
+ if (!currentServer.equals(serverName)) {
+ // add it back to the queue with the correct server it should be picked up in the future.
+ LOG.info("Server changed for region: " + request.getRegion().getEncodedName() + " from: "
+ + serverName + " to: " + currentServer + " re-queuing request");
+ clusterCompactionQueues.addToCompactionQueue(currentServer, request);
+ clusterCompactionQueues.releaseCompaction(serverName);
+ } else {
+ LOG.info("Firing off compaction request for server: " + serverName + ", " + request
+ + " total queue size left: " + clusterCompactionQueues
+ .getCompactionRequestsLeftToFinish());
+ futures.add(executor.submit(new Compact(serverName, request)));
+ }
+ } else {
+ // haven't assigned anything so we sleep.
+ Thread.sleep(sleepForMs);
+ }
+ }
+ LOG.info("All compactions have completed");
+ }
+
+ private boolean futuresComplete(List<Future<?>> futures) {
+ futures.removeIf(Future::isDone);
+ return futures.isEmpty();
+ }
+
+ public void shutdown() throws Exception {
+ executor.shutdown();
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ if (!ERRORS.isEmpty()) {
+ StringBuilder builder =
+ new StringBuilder().append("Major compaction failed, there were: ").append(ERRORS.size())
+ .append(" regions / stores that failed compacting\n")
+ .append("Failed compaction requests\n").append("--------------------------\n")
+ .append(Joiner.on("\n").join(ERRORS));
+ LOG.error(builder.toString());
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ LOG.info("All regions major compacted successfully");
+ }
+
+ @VisibleForTesting void initializeWorkQueues() throws IOException {
+ if (storesToCompact.isEmpty()) {
+ connection.getTable(tableName).getDescriptor().getColumnFamilyNames()
+ .forEach(a -> storesToCompact.add(Bytes.toString(a)));
+ LOG.info("No family specified, will execute for all families");
+ }
+ LOG.info(
+ "Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact);
+ List<HRegionLocation> regionLocations =
+ connection.getRegionLocator(tableName).getAllRegionLocations();
+ for (HRegionLocation location : regionLocations) {
+ Optional<MajorCompactionRequest> request = MajorCompactionRequest
+ .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact,
+ timestamp);
+ request.ifPresent(majorCompactionRequest -> clusterCompactionQueues
+ .addToCompactionQueue(location.getServerName(), majorCompactionRequest));
+ }
+ }
+
+ class Compact implements Runnable {
+
+ private final ServerName serverName;
+ private final MajorCompactionRequest request;
+
+ Compact(ServerName serverName, MajorCompactionRequest request) {
+ this.serverName = serverName;
+ this.request = request;
+ }
+
+ @Override public void run() {
+ try {
+ compactAndWait(request);
+ } catch (NotServingRegionException e) {
+ // this region has split or merged
+ LOG.warn("Region is invalid, requesting updated regions", e);
+ // lets updated the cluster compaction queues with these newly created regions.
+ addNewRegions();
+ } catch (Exception e) {
+ LOG.warn("Error compacting:", e);
+ } finally {
+ clusterCompactionQueues.releaseCompaction(serverName);
+ }
+ }
+
+ void compactAndWait(MajorCompactionRequest request) throws Exception {
+ Admin admin = connection.getAdmin();
+ try {
+ // only make the request if the region is not already major compacting
+ if (!isCompacting(request)) {
+ Set<String> stores = request.getStoresRequiringCompaction(storesToCompact);
+ if (!stores.isEmpty()) {
+ request.setStores(stores);
+ for (String store : request.getStores()) {
+ admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(),
+ Bytes.toBytes(store));
+ }
+ }
+ }
+ while (isCompacting(request)) {
+ Thread.sleep(sleepForMs);
+ LOG.debug("Waiting for compaction to complete for region: " + request.getRegion()
+ .getEncodedName());
+ }
+ } finally {
+ // Make sure to wait for the CompactedFileDischarger chore to do its work
+ int waitForArchive = connection.getConfiguration()
+ .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
+ Thread.sleep(waitForArchive);
+ // check if compaction completed successfully, otherwise put that request back in the
+ // proper queue
+ Set<String> storesRequiringCompaction =
+ request.getStoresRequiringCompaction(storesToCompact);
+ if (!storesRequiringCompaction.isEmpty()) {
+ // this happens, when a region server is marked as dead, flushes a store file and
+ // the new regionserver doesn't pick it up because its accounted for in the WAL replay,
+ // thus you have more store files on the filesystem than the regionserver knows about.
+ boolean regionHasNotMoved = connection.getRegionLocator(tableName)
+ .getRegionLocation(request.getRegion().getStartKey()).getServerName()
+ .equals(serverName);
+ if (regionHasNotMoved) {
+ LOG.error("Not all store files were compacted, this may be due to the regionserver not "
+ + "being aware of all store files. Will not reattempt compacting, " + request);
+ ERRORS.add(request);
+ } else {
+ request.setStores(storesRequiringCompaction);
+ clusterCompactionQueues.addToCompactionQueue(serverName, request);
+ LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction
+ + " region: " + request.getRegion().getEncodedName());
+ }
+ } else {
+ LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName()
+ + " -> cf(s): " + request.getStores());
+ }
+ }
+ }
+ }
+
+ private boolean isCompacting(MajorCompactionRequest request) throws Exception {
+ CompactionState compactionState = connection.getAdmin()
+ .getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes());
+ return compactionState.equals(CompactionState.MAJOR) || compactionState
+ .equals(CompactionState.MAJOR_AND_MINOR);
+ }
+
+ private void addNewRegions() {
+ try {
+ List<HRegionLocation> locations =
+ connection.getRegionLocator(tableName).getAllRegionLocations();
+ for (HRegionLocation location : locations) {
+ if (location.getRegion().getRegionId() > timestamp) {
+ Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest
+ .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact,
+ timestamp);
+ compactionRequest.ifPresent(request -> clusterCompactionQueues
+ .addToCompactionQueue(location.getServerName(), request));
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption(
+ Option.builder("table")
+ .required()
+ .desc("table name")
+ .hasArg()
+ .build()
+ );
+ options.addOption(
+ Option.builder("cf")
+ .optionalArg(true)
+ .desc("column families: comma separated eg: a,b,c")
+ .hasArg()
+ .build()
+ );
+ options.addOption(
+ Option.builder("servers")
+ .required()
+ .desc("Concurrent servers compacting")
+ .hasArg()
+ .build()
+ );
+ options.addOption(
+ Option.builder("minModTime").
+ desc("Compact if store files have modification time < minModTime")
+ .hasArg()
+ .build()
+ );
+ options.addOption(
+ Option.builder("zk")
+ .optionalArg(true)
+ .desc("zk quorum")
+ .hasArg()
+ .build()
+ );
+ options.addOption(
+ Option.builder("rootDir")
+ .optionalArg(true)
+ .desc("hbase.rootDir")
+ .hasArg()
+ .build()
+ );
+ options.addOption(
+ Option.builder("sleep")
+ .desc("Time to sleepForMs (ms) for checking compaction status per region and available "
+ + "work queues: default 30s")
+ .hasArg()
+ .build()
+ );
+ options.addOption(
+ Option.builder("retries")
+ .desc("Max # of retries for a compaction request," + " defaults to 3")
+ .hasArg()
+ .build()
+ );
+ options.addOption(
+ Option.builder("dryRun")
+ .desc("Dry run, will just output a list of regions that require compaction based on "
+ + "parameters passed")
+ .hasArg(false)
+ .build()
+ );
+
+ final CommandLineParser cmdLineParser = new DefaultParser();
+ CommandLine commandLine = null;
+ try {
+ commandLine = cmdLineParser.parse(options, args);
+ } catch (ParseException parseException) {
+ System.out.println(
+ "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
+ + parseException);
+ printUsage(options);
+
+ }
+ String tableName = commandLine.getOptionValue("table");
+ String cf = commandLine.getOptionValue("cf", null);
+ Set<String> families = Sets.newHashSet();
+ if (cf != null) {
+ Iterables.addAll(families, Splitter.on(",").split(cf));
+ }
+
+
+ Configuration configuration = HBaseConfiguration.create();
+ int concurrency = Integer.parseInt(commandLine.getOptionValue("servers"));
+ long minModTime = Long.parseLong(
+ commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis())));
+ String quorum =
+ commandLine.getOptionValue("zk", configuration.get(HConstants.ZOOKEEPER_QUORUM));
+ String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR));
+ long sleep = Long.valueOf(commandLine.getOptionValue("sleep", Long.toString(30000)));
+
+ configuration.set(HConstants.HBASE_DIR, rootDir);
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);
+
+ MajorCompactor compactor =
+ new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency,
+ minModTime, sleep);
+
+ compactor.initializeWorkQueues();
+ if (!commandLine.hasOption("dryRun")) {
+ compactor.compactAllRegions();
+ }
+ compactor.shutdown();
+ }
+
+ private static void printUsage(final Options options) {
+ String header = "\nUsage instructions\n\n";
+ String footer = "\n";
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java
new file mode 100644
index 0000000..c5ce4e3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.compaction;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@Category({SmallTests.class})
+public class MajorCompactionRequestTest {
+
+ private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility();
+ private static final String FAMILY = "a";
+ private Path rootRegionDir;
+ private Path regionStoreDir;
+
+ @Before public void setUp() throws Exception {
+ rootRegionDir = UTILITY.getDataTestDirOnTestFS("MajorCompactionRequestTest");
+ regionStoreDir = new Path(rootRegionDir, FAMILY);
+ }
+
+ @Test public void testStoresNeedingCompaction() throws Exception {
+ // store files older than timestamp
+ List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10);
+ MajorCompactionRequest request = makeMockRequest(100, storeFiles, false);
+ Optional<MajorCompactionRequest> result =
+ request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY));
+ assertTrue(result.isPresent());
+
+ // store files newer than timestamp
+ storeFiles = mockStoreFiles(regionStoreDir, 5, 101);
+ request = makeMockRequest(100, storeFiles, false);
+ result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY));
+ assertFalse(result.isPresent());
+ }
+
+ @Test public void testIfWeHaveNewReferenceFilesButOldStoreFiles() throws Exception {
+ // this tests that reference files that are new, but have older timestamps for the files
+ // they reference still will get compacted.
+ TableName table = TableName.valueOf("MajorCompactorTest");
+ TableDescriptor htd = UTILITY.createTableDescriptor(table, Bytes.toBytes(FAMILY));
+ RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
+ HRegion region =
+ HBaseTestingUtility.createRegionAndWAL(hri, rootRegionDir, UTILITY.getConfiguration(), htd);
+
+ Configuration configuration = mock(Configuration.class);
+ // the reference file timestamp is newer
+ List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 4, 101);
+ List<Path> paths = storeFiles.stream().map(StoreFileInfo::getPath).collect(Collectors.toList());
+ // the files that are referenced are older, thus we still compact.
+ HRegionFileSystem fileSystem =
+ mockFileSystem(region.getRegionInfo(), true, storeFiles, 50);
+ MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration,
+ region.getRegionInfo(), Sets.newHashSet(FAMILY), 100));
+ doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration));
+ doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class),
+ any(Path.class));
+ doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class));
+ Set<String> result = majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"));
+ assertEquals(FAMILY, Iterables.getOnlyElement(result));
+ }
+
+ private HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles,
+ List<StoreFileInfo> storeFiles) throws IOException {
+ long timestamp = storeFiles.stream().findFirst().get().getModificationTime();
+ return mockFileSystem(info, hasReferenceFiles, storeFiles, timestamp);
+ }
+
+ private HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles,
+ List<StoreFileInfo> storeFiles, long referenceFileTimestamp) throws IOException {
+ FileSystem fileSystem = mock(FileSystem.class);
+ if (hasReferenceFiles) {
+ FileStatus fileStatus = mock(FileStatus.class);
+ doReturn(referenceFileTimestamp).when(fileStatus).getModificationTime();
+ doReturn(fileStatus).when(fileSystem).getFileLinkStatus(isA(Path.class));
+ }
+ HRegionFileSystem mockSystem = mock(HRegionFileSystem.class);
+ doReturn(info).when(mockSystem).getRegionInfo();
+ doReturn(regionStoreDir).when(mockSystem).getStoreDir(FAMILY);
+ doReturn(hasReferenceFiles).when(mockSystem).hasReferences(anyString());
+ doReturn(storeFiles).when(mockSystem).getStoreFiles(anyString());
+ doReturn(fileSystem).when(mockSystem).getFileSystem();
+ return mockSystem;
+ }
+
+ private List<StoreFileInfo> mockStoreFiles(Path regionStoreDir, int howMany, long timestamp)
+ throws IOException {
+ List<StoreFileInfo> infos = Lists.newArrayList();
+ int i = 0;
+ while (i < howMany) {
+ StoreFileInfo storeFileInfo = mock(StoreFileInfo.class);
+ doReturn(timestamp).doReturn(timestamp).when(storeFileInfo).getModificationTime();
+ doReturn(new Path(regionStoreDir, RandomStringUtils.randomAlphabetic(10))).when(storeFileInfo)
+ .getPath();
+ infos.add(storeFileInfo);
+ i++;
+ }
+ return infos;
+ }
+
+ private MajorCompactionRequest makeMockRequest(long timestamp, List<StoreFileInfo> storeFiles,
+ boolean references) throws IOException {
+ Configuration configuration = mock(Configuration.class);
+ RegionInfo regionInfo = mock(RegionInfo.class);
+ when(regionInfo.getEncodedName()).thenReturn("HBase");
+ when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
+ MajorCompactionRequest request =
+ new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"), timestamp);
+ MajorCompactionRequest spy = spy(request);
+ HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles);
+ doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
+ doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
+ return spy;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java
new file mode 100644
index 0000000..3fb37ec
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.compaction;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+@Category({ MiscTests.class, MediumTests.class })
+public class MajorCompactorTest {
+
+ public static final byte[] FAMILY = Bytes.toBytes("a");
+ private HBaseTestingUtility utility;
+
+ @Before public void setUp() throws Exception {
+ utility = new HBaseTestingUtility();
+ utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10);
+ utility.startMiniCluster();
+ }
+
+ @After public void tearDown() throws Exception {
+ utility.shutdownMiniCluster();
+ }
+
+ @Test public void testCompactingATable() throws Exception {
+ TableName tableName = TableName.valueOf("MajorCompactorTest");
+ utility.createMultiRegionTable(tableName, FAMILY, 5);
+ utility.waitTableAvailable(tableName);
+ Connection connection = utility.getConnection();
+ Table table = connection.getTable(tableName);
+ // write data and flush multiple store files:
+ for (int i = 0; i < 5; i++) {
+ utility.loadRandomRows(table, FAMILY, 50, 100);
+ utility.flush(tableName);
+ }
+ table.close();
+ int numberOfRegions = utility.getAdmin().getRegions(tableName).size();
+ int numHFiles = utility.getNumHFiles(tableName, FAMILY);
+ // we should have a table with more store files than we would before we major compacted.
+ assertTrue(numberOfRegions < numHFiles);
+
+ MajorCompactor compactor =
+ new MajorCompactor(utility.getConfiguration(), tableName,
+ Sets.newHashSet(Bytes.toString(FAMILY)), 1, System.currentTimeMillis(), 200);
+ compactor.initializeWorkQueues();
+ compactor.compactAllRegions();
+ compactor.shutdown();
+
+ // verify that the store has been completely major compacted.
+ numberOfRegions = utility.getAdmin().getRegions(tableName).size();
+ numHFiles = utility.getNumHFiles(tableName, FAMILY);
+ assertEquals(numHFiles, numberOfRegions);
+ }
+}
\ No newline at end of file