You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kr...@apache.org on 2023/01/10 07:30:37 UTC

[hive] branch master updated: HIVE-26718: Enable initiator to schedule rebalancing compactions (Laszlo Vegh, reviewed by Krisztian Kasa)

This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 919e734d3b6 HIVE-26718: Enable initiator to schedule rebalancing compactions (Laszlo Vegh, reviewed by Krisztian Kasa)
919e734d3b6 is described below

commit 919e734d3b67902a721afab1ca9a803855dbf2ec
Author: veghlaci05 <lv...@cloudera.com>
AuthorDate: Tue Jan 10 08:30:24 2023 +0100

    HIVE-26718: Enable initiator to schedule rebalancing compactions (Laszlo Vegh, reviewed by Krisztian Kasa)
---
 .../hadoop/hive/ql/txn/compactor/Initiator.java    | 52 +++++++++++++++-
 .../hadoop/hive/ql/txn/compactor/Worker.java       |  3 +-
 .../hive/ql/txn/compactor/TestInitiator.java       | 70 ++++++++++++++++++++++
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |  9 +++
 4 files changed, 131 insertions(+), 3 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 612cd492ca1..96a778dfcb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -479,8 +479,8 @@ public class Initiator extends MetaStoreCompactorThread {
     return AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false);
   }
 
-  private CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory dir, Map<String,
-      String> tblproperties, long baseSize, long deltaSize) {
+  private CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory dir,
+                                                 Map<String, String> tblproperties, long baseSize, long deltaSize) {
     boolean noBase = false;
     List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
     if (baseSize == 0 && deltaSize > 0) {
@@ -520,6 +520,10 @@ public class Initiator extends MetaStoreCompactorThread {
       if (initiateMajor) return CompactionType.MAJOR;
     }
 
+    if (scheduleRebalance(ci, dir, tblproperties, baseSize, deltaSize)) {
+      return CompactionType.REBALANCE;
+    }
+
     String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX +
         HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
     int deltaNumThreshold = deltaNumProp == null ?
@@ -539,6 +543,50 @@ public class Initiator extends MetaStoreCompactorThread {
             CompactionType.MAJOR : CompactionType.MINOR;
   }
 
+  private boolean scheduleRebalance(CompactionInfo ci, AcidDirectory dir, Map<String, String> tblproperties, long baseSize, long deltaSize) {
+    // bucket size calculation can be resource intensive if there are numerous deltas, so we check for rebalance
+    // compaction only if the table is in an acceptable shape: no major compaction required. This means the number of
+    // files shouldn't be too high
+    if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
+        HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED) &&
+        AcidUtils.isFullAcidTable(tblproperties)) {
+      long totalSize = baseSize + deltaSize;
+      long minimumSize = MetastoreConf.getLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE);
+      if (totalSize > minimumSize) {
+        try {
+          Map<Integer, Long> bucketSizes = new HashMap<>();
+          //compute the size of each bucket
+          dir.getFiles().stream()
+              .filter(f -> AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath()))
+              .forEach(
+                  f -> bucketSizes.merge(
+                      AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()),
+                      f.getHdfsFileStatusWithId().getFileStatus().getLen(),
+                      Long::sum));
+          final double mean = (double) totalSize / bucketSizes.size();
+
+          // calculate the standard deviation
+          double standardDeviation = Math.sqrt(
+              bucketSizes.values().stream().mapToDouble(Long::doubleValue)
+                  .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num - mean, 2)) / bucketSizes.size()));
+
+          double rsdThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD);
+          //Relative standard deviation: If the standard deviation is larger than rsdThreshold * average_bucket_size,
+          // a rebalancing compaction is initiated.
+          if (standardDeviation > mean * rsdThreshold) {
+            LOG.debug("Initiating REBALANCE compaction on table {}", ci.tableName);
+            return true;
+          }
+        } catch (IOException e) {
+          LOG.error("Error occured during checking bucket file sizes, rebalance threshold calculation is skipped.", e);
+        }
+      } else {
+        LOG.debug("Table is smaller than the minimum required size for REBALANCE compaction.");
+      }
+    }
+    return false;
+  }
+
   private long getBaseSize(AcidDirectory dir) throws IOException {
     long baseSize = 0;
     if (dir.getBase() != null) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index e48b7488e67..7fa1cfca97b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -169,7 +169,8 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
     boolean isEnoughToCompact;
 
     if (ci.isRebalanceCompaction()) {
-      //TODO: For now, we are allowing rebalance compaction regardless of the table state. Thresholds will be added later.
+      //However thresholds are used to schedule REBALANCE compaction, manual triggering is always allowed if the
+      //table and query engine supports it
       return true;
     } else if (ci.isMajorCompaction()) {
       isEnoughToCompact =
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 6a9a37dfe3f..7f690dbd6fc 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -72,6 +72,76 @@ public class TestInitiator extends CompactorTest {
     startInitiator();
   }
 
+  @Test
+  public void compactRebalance() throws Exception {
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    //Set the tresholds to reach the rebalance compaction threshold without reaching the major compaction threshold.
+    MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE, 100);
+    MetastoreConf.setDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD, 0.02);
+
+    prepareRebalanceData();
+    startInitiator();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("initiated", compacts.get(0).getState());
+    Assert.assertEquals("rebalance", compacts.get(0).getTablename());
+    Assert.assertEquals(CompactionType.REBALANCE, compacts.get(0).getType());
+  }
+
+  @Test
+  public void noCompactRebalanceSmallTable() throws Exception {
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+
+    prepareRebalanceData();
+    startInitiator();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(0, compacts.size());
+  }
+
+  @Test
+  public void noCompactRebalanceDataBalanced() throws Exception {
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    //Set minimum size to let initiator check, but doesn't modify rebalance threshold. No rebalance compaciton should
+    //be initiated.
+    MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE, 100);
+
+    prepareRebalanceData();
+    startInitiator();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(0, compacts.size());
+  }
+
+  private void prepareRebalanceData() throws Exception {
+    Table t = newTable("default", "rebalance", false);
+
+    addBaseFile(t, null, 200L, 200, 2, true);
+    addDeltaFile(t, null, 201L, 220L, 19, 2, false);
+
+    burnThroughTransactions("default", "rebalance", 220);
+
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
+    comp.setTablename("rebalance");
+    comp.setOperationType(DataOperationType.UPDATE);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    txnHandler.lock(req);
+    long writeid = allocateWriteId("default", "rebalance", txnid);
+    Assert.assertEquals(221, writeid);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+  }
+
   @Test
   public void recoverFailedLocalWorkers() throws Exception {
     Table t = newTable("default", "rflw1", false);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 359b24bc7e6..72ed7c9ee08 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -432,6 +432,15 @@ public class MetastoreConf {
         "Time after Initiator will ignore metastore.compactor.initiator.failed.compacts.threshold "
             + "and retry with compaction again. This will try to auto heal tables with previous failed compaction "
             + "without manual intervention. Setting it to 0 or negative value will disable this feature."),
+    COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE("metastore.compactor.initiator.rebalance.min.size",
+        "hive.compactor.initiator.rebalance.min.size", 1024*1024*100,
+        "Minimum table/partition size for which a rebalancing compaction can be initiated."),
+    COMPACTOR_INITIATOR_REBALANCE_THRESHOLD("metastore.compactor.initiator.rebalance.threshold",
+        "hive.compactor.initiator.rebalance.threshold", 0.2d,
+        "Threshold for the rebalancing compaction. If the std_dev/average_bucket_size (where std_dev is the " +
+            "standard deviation of the bucket sizes) is larger than the threshold, a rebalance compaction is initiated. " +
+            "In other words (assuming that the value is 0.2): If the standard deviation is larger than 20% of the average " +
+            "bucket size, a rebalancing compaction is initiated. "),
     COMPACTOR_RUN_AS_USER("metastore.compactor.run.as.user", "hive.compactor.run.as.user", "",
         "Specify the user to run compactor Initiator and Worker as. If empty string, defaults to table/partition " +
         "directory owner."),