You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2021/07/01 15:13:59 UTC

[GitHub] [hbase] Apache9 commented on a change in pull request #3425: HBASE-25991 Do compaction on compaction server

Apache9 commented on a change in pull request #3425:
URL: https://github.com/apache/hbase/pull/3425#discussion_r662375310



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
##########
@@ -19,41 +18,441 @@
 package org.apache.hadoop.hbase.compactionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.CompactThreadControl;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
+
+/**
+ * CompactionThreadManager reuse {@link HStore#selectCompaction}, {@link HStore#throttleCompaction},
+ * {@link CompactionContext#compact}, {@link CompactThreadControl}, which are core logic of
+ * compaction.
+ */
 @InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
   private static Logger LOG = LoggerFactory.getLogger(CompactionThreadManager.class);
+  // Configuration key for the large compaction threads.
+  private final static String LARGE_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.large";
+  private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+  // Configuration key for the small compaction threads.
+  private final static String SMALL_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.small";
+  private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
 
   private final Configuration conf;
-  private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
-      new ConcurrentHashMap<>();
   private final HCompactionServer server;
+  private HFileSystem fs;
+  private Path rootDir;
+  private FSTableDescriptors tableDescriptors;
+  private CompactThreadControl compactThreadControl;
+  private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+      new ConcurrentHashMap<>();
+  private static CompactionServerStorage storage = new CompactionServerStorage();
 
-  public CompactionThreadManager(final Configuration conf, HCompactionServer server) {
+  CompactionThreadManager(final Configuration conf, HCompactionServer server) {
     this.conf = conf;
     this.server = server;
+    try {
+      this.fs = new HFileSystem(this.conf, true);
+      this.rootDir = CommonFSUtils.getRootDir(this.conf);
+      this.tableDescriptors = new FSTableDescriptors(conf);
+      int largeThreads =
+          Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
+      int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
+      compactThreadControl = new CompactThreadControl(this, largeThreads, smallThreads,
+          COMPACTION_TASK_COMPARATOR, REJECTION);
+    } catch (Throwable t) {
+      LOG.error("Failed construction CompactionThreadManager", t);
+    }
   }
 
-  private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) throws IOException {
-    AsyncRegionServerAdmin admin = this.rsAdmins.get(sn);
-    if (admin == null) {
-      LOG.debug("New RS admin connection to {}", sn);
-      admin = this.server.getAsyncClusterConnection().getRegionServerAdmin(sn);
-      this.rsAdmins.put(sn, admin);
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public ChoreService getChoreService() {
+    return server.getChoreService();
+  }
+
+  @Override
+  public double getCompactionPressure() {
+    double max = 0;
+    for (CompactionTask task : getRunningCompactionTasks().values()) {
+      double normCount = task.getStore().getCompactionPressure();
+      if (normCount > max) {
+        max = normCount;
+      }
+    }
+    return max;
+  }
+
+  @Override
+  public double getFlushPressure() {
+    return 0;
+  }
+
+  public void requestCompaction(CompactionTask compactionTask) {
+    try {
+      selectFileAndExecuteTask(compactionTask);
+    } catch (Throwable e) {
+      LOG.error("Failed requestCompaction {}", compactionTask, e);
+    }
+  }
+
+  private void selectFileAndExecuteTask(CompactionTask compactionTask) throws IOException {
+    ServerName rsServerName = compactionTask.getRsServerName();
+    RegionInfo regionInfo = compactionTask.getRegionInfo();
+    ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+    String logStr = compactionTask.toString();
+    MonitoredTask status =
+        TaskMonitor.get().createStatus("Compacting region: " + regionInfo.getRegionNameAsString()
+            + ", family: " + cfd.getNameAsString() + " from RS: " + rsServerName);
+    status.enableStatusJournal(false);
+    // 1. select compaction and check compaction context is present
+    LOG.info("Start select compaction {}", compactionTask);
+    status.setStatus("Start select compaction");
+    Pair<HStore, Optional<CompactionContext>> pair = selectCompaction(regionInfo, cfd,
+      compactionTask.isRequestMajor(), compactionTask.getPriority(), status, logStr);
+    HStore store = pair.getFirst();
+    Optional<CompactionContext> compaction = pair.getSecond();
+    if (!compaction.isPresent()) {
+      store.close();
+      LOG.info("Compaction context is empty: {}", compactionTask);
+      status.abort("Compaction context is empty and return");
+      return;
+    }
+    CompactionContext compactionContext = compaction.get();
+    // 2. update storage
+    Pair<Boolean, List<String>> updateStoreResult =
+        updateStorageAfterSelectCompaction(regionInfo, cfd, compactionContext, status, logStr);
+    if (!updateStoreResult.getFirst()) {
+      store.close();
+      return;
+    }
+    List<String> selectedFileNames = updateStoreResult.getSecond();
+    compactionTask.setHStore(store);
+    compactionTask.setCompactionContext(compactionContext);
+    compactionTask.setSelectedFileNames(selectedFileNames);
+    compactionTask.setMonitoredTask(status);
+    compactionTask.setPriority(compactionContext.getRequest().getPriority());
+    // 3. execute a compaction task
+    ThreadPoolExecutor pool;
+    pool = store.throttleCompaction(compactionContext.getRequest().getSize())
+        ? compactThreadControl.getLongCompactions()
+        : compactThreadControl.getShortCompactions();
+    pool.submit(new CompactionTaskRunner(compactionTask));
+  }
+
+  /**
+   * Open store, and select compaction context
+   * @return Store and CompactionContext
+   */
+  Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo regionInfo,
+      ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask status, String logStr)
+      throws IOException {
+    status.setStatus("Open store");
+    tableDescriptors.get(regionInfo.getTable());
+    HStore store = getStore(conf, fs, rootDir, tableDescriptors.get(regionInfo.getTable()),
+      regionInfo, cfd.getNameAsString());
+
+    // CompactedHFilesDischarger only run on regionserver, so compactionserver does not have
+    // opportunity to clean compacted file at that time, we clean compacted files here
+    storage.cleanupCompactedFiles(regionInfo, cfd,
+      store.getStorefiles().stream().map(sf -> sf.getPath().getName()).collect(Collectors.toSet()));
+    if (major) {
+      status.setStatus("Trigger major compaction");
+      store.triggerMajorCompaction();
+    }
+    // get current compacting and compacted files, NOTE: these files are file names only, don't
+    // include paths.
+    status.setStatus("Get current compacting and compacted files from storage");
+    Set<String> excludeFiles = new HashSet<>();
+    Set<String> compactingFiles = storage.getSelectedStoreFiles(regionInfo, cfd);
+    synchronized (compactingFiles) {

Review comment:
       It is a bit strange that we need to synchronized on a returned collection, not a very good design I'd say...

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
##########
@@ -19,41 +18,483 @@
 package org.apache.hadoop.hbase.compactionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.StealJobQueue;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
 @InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
   private static Logger LOG = LoggerFactory.getLogger(CompactionThreadManager.class);
+  // Configuration key for the large compaction threads.
+  private final static String LARGE_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.large";
+  private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+  // Configuration key for the small compaction threads.
+  private final static String SMALL_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.small";
+  private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
 
   private final Configuration conf;
   private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
       new ConcurrentHashMap<>();
   private final HCompactionServer server;
+  private HFileSystem fs;
+  private Path rootDir;
+  private FSTableDescriptors tableDescriptors;
+  // compaction pools
+  private volatile ThreadPoolExecutor longCompactions;
+  private volatile ThreadPoolExecutor shortCompactions;
+  private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+      new ConcurrentHashMap<>();
+  private PressureAwareCompactionThroughputController throughputController;
+  private CompactionServerStorage storage = new CompactionServerStorage();
 
-  public CompactionThreadManager(final Configuration conf, HCompactionServer server) {
+  CompactionThreadManager(final Configuration conf, HCompactionServer server) {
     this.conf = conf;
     this.server = server;
+    try {
+      this.fs = new HFileSystem(this.conf, true);
+      this.rootDir = CommonFSUtils.getRootDir(this.conf);
+      this.tableDescriptors = new FSTableDescriptors(conf);
+      // start compaction resources
+      this.throughputController = new PressureAwareCompactionThroughputController();
+      this.throughputController.setConf(conf);
+      this.throughputController.setup(this);
+      startCompactionPool();
+    } catch (Throwable t) {
+      LOG.error("Failed construction CompactionThreadManager", t);
+    }
+  }
+
+  private void startCompactionPool() {
+    final String n = Thread.currentThread().getName();
+    // threads pool used to execute short and long compactions
+    int largeThreads =
+        Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
+    int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
+    StealJobQueue<Runnable> stealJobQueue =
+        new StealJobQueue<>(largeThreads, smallThreads, COMPACTION_TASK_COMPARATOR);
+    this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
+        stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d")
+            .setDaemon(true).build());
+    this.longCompactions.setRejectedExecutionHandler(new Rejection());
+    this.longCompactions.prestartAllCoreThreads();
+    this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
+        stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
+            .setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
+    this.shortCompactions.setRejectedExecutionHandler(new Rejection());
+  }
+
+  @Override
+  public ChoreService getChoreService() {
+    return server.getChoreService();
+  }
+
+  @Override
+  public double getCompactionPressure() {
+    double max = 0;
+    for (CompactionTask task : getRunningCompactionTasks().values()) {
+      double normCount = task.getStore().getCompactionPressure();
+      if (normCount > max) {
+        max = normCount;
+      }
+    }
+    return max;
+  }
+
+  @Override
+  public double getFlushPressure() {
+    return 0;
+  }
+
+  public void requestCompaction(CompactionTask compactionTask) {
+    try {
+      selectFileAndExecuteTask(compactionTask);
+    } catch (Throwable e) {
+      LOG.error("Failed requestCompaction {}", compactionTask, e);
+    }
+  }
+
+  private void selectFileAndExecuteTask(CompactionTask compactionTask) throws IOException {
+    ServerName rsServerName = compactionTask.getRsServerName();
+    RegionInfo regionInfo = compactionTask.getRegionInfo();
+    ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+    String logStr = compactionTask.toString();
+    MonitoredTask status =
+        TaskMonitor.get().createStatus("Compacting region: " + regionInfo.getRegionNameAsString()
+            + ", family: " + cfd.getNameAsString() + " from RS: " + rsServerName);
+    status.enableStatusJournal(false);
+    // 1. select compaction and check compaction context is present
+    LOG.info("Start select compaction {}", compactionTask);
+    status.setStatus("Start select compaction");
+    Pair<HStore, Optional<CompactionContext>> pair = selectCompaction(regionInfo, cfd,
+      compactionTask.isRequestMajor(), compactionTask.getPriority(), status, logStr);
+    HStore store = pair.getFirst();
+    Optional<CompactionContext> compaction = pair.getSecond();
+    if (!compaction.isPresent()) {
+      store.close();
+      LOG.info("Compaction context is empty: {}", compactionTask);
+      status.abort("Compaction context is empty and return");
+      return;
+    }
+    CompactionContext compactionContext = compaction.get();
+    // 2. update storage
+    Pair<Boolean, List<String>> updateStoreResult =
+        updateStorageAfterSelectCompaction(regionInfo, cfd, compactionContext, status, logStr);
+    if (!updateStoreResult.getFirst()) {
+      store.close();
+      return;
+    }
+    List<String> selectedFileNames = updateStoreResult.getSecond();
+    compactionTask.setHStore(store);
+    compactionTask.setCompactionContext(compactionContext);
+    compactionTask.setSelectedFileNames(selectedFileNames);
+    compactionTask.setMonitoredTask(status);
+    // 3. execute a compaction task
+    ThreadPoolExecutor pool;
+    pool = store.throttleCompaction(compactionContext.getRequest().getSize()) ? longCompactions
+        : shortCompactions;
+    pool.submit(new CompactionTaskRunner(compactionTask));
+  }
+
+  /**
+   * Open store, and select compaction context
+   * @return Store and CompactionContext
+   */
+  private Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo regionInfo,
+    ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask status, String logStr)
+      throws IOException {
+    status.setStatus("Open store");
+    tableDescriptors.get(regionInfo.getTable());
+    HStore store = getStore(conf, fs, rootDir, tableDescriptors.get(regionInfo.getTable()),
+      regionInfo, cfd.getNameAsString());
+    storage.cleanupCompactedFiles(regionInfo, cfd,

Review comment:
       So I suppose this should be done by region server, not by compaction server? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@hbase.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org