You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kr...@apache.org on 2023/01/10 07:30:37 UTC
[hive] branch master updated: HIVE-26718: Enable initiator to schedule rebalancing compactions (Laszlo Vegh, reviewed by Krisztian Kasa)
This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 919e734d3b6 HIVE-26718: Enable initiator to schedule rebalancing compactions (Laszlo Vegh, reviewed by Krisztian Kasa)
919e734d3b6 is described below
commit 919e734d3b67902a721afab1ca9a803855dbf2ec
Author: veghlaci05 <lv...@cloudera.com>
AuthorDate: Tue Jan 10 08:30:24 2023 +0100
HIVE-26718: Enable initiator to schedule rebalancing compactions (Laszlo Vegh, reviewed by Krisztian Kasa)
---
.../hadoop/hive/ql/txn/compactor/Initiator.java | 52 +++++++++++++++-
.../hadoop/hive/ql/txn/compactor/Worker.java | 3 +-
.../hive/ql/txn/compactor/TestInitiator.java | 70 ++++++++++++++++++++++
.../hadoop/hive/metastore/conf/MetastoreConf.java | 9 +++
4 files changed, 131 insertions(+), 3 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 612cd492ca1..96a778dfcb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -479,8 +479,8 @@ public class Initiator extends MetaStoreCompactorThread {
return AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false);
}
- private CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory dir, Map<String,
- String> tblproperties, long baseSize, long deltaSize) {
+ private CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory dir,
+ Map<String, String> tblproperties, long baseSize, long deltaSize) {
boolean noBase = false;
List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
if (baseSize == 0 && deltaSize > 0) {
@@ -520,6 +520,10 @@ public class Initiator extends MetaStoreCompactorThread {
if (initiateMajor) return CompactionType.MAJOR;
}
+ if (scheduleRebalance(ci, dir, tblproperties, baseSize, deltaSize)) {
+ return CompactionType.REBALANCE;
+ }
+
String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX +
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
int deltaNumThreshold = deltaNumProp == null ?
@@ -539,6 +543,50 @@ public class Initiator extends MetaStoreCompactorThread {
CompactionType.MAJOR : CompactionType.MINOR;
}
+ private boolean scheduleRebalance(CompactionInfo ci, AcidDirectory dir, Map<String, String> tblproperties, long baseSize, long deltaSize) {
+ // bucket size calculation can be resource intensive if there are numerous deltas, so we check for rebalance
+ // compaction only if the table is in an acceptable shape: no major compaction required. This means the number of
+ // files shouldn't be too high
+ if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED) &&
+ AcidUtils.isFullAcidTable(tblproperties)) {
+ long totalSize = baseSize + deltaSize;
+ long minimumSize = MetastoreConf.getLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE);
+ if (totalSize > minimumSize) {
+ try {
+ Map<Integer, Long> bucketSizes = new HashMap<>();
+ //compute the size of each bucket
+ dir.getFiles().stream()
+ .filter(f -> AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath()))
+ .forEach(
+ f -> bucketSizes.merge(
+ AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()),
+ f.getHdfsFileStatusWithId().getFileStatus().getLen(),
+ Long::sum));
+ final double mean = (double) totalSize / bucketSizes.size();
+
+ // calculate the standard deviation
+ double standardDeviation = Math.sqrt(
+ bucketSizes.values().stream().mapToDouble(Long::doubleValue)
+ .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num - mean, 2)) / bucketSizes.size()));
+
+ double rsdThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD);
+ //Relative standard deviation: If the standard deviation is larger than rsdThreshold * average_bucket_size,
+ // a rebalancing compaction is initiated.
+ if (standardDeviation > mean * rsdThreshold) {
+ LOG.debug("Initiating REBALANCE compaction on table {}", ci.tableName);
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.error("Error occured during checking bucket file sizes, rebalance threshold calculation is skipped.", e);
+ }
+ } else {
+ LOG.debug("Table is smaller than the minimum required size for REBALANCE compaction.");
+ }
+ }
+ return false;
+ }
+
private long getBaseSize(AcidDirectory dir) throws IOException {
long baseSize = 0;
if (dir.getBase() != null) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index e48b7488e67..7fa1cfca97b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -169,7 +169,8 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
boolean isEnoughToCompact;
if (ci.isRebalanceCompaction()) {
- //TODO: For now, we are allowing rebalance compaction regardless of the table state. Thresholds will be added later.
+ //However thresholds are used to schedule REBALANCE compaction, manual triggering is always allowed if the
+ //table and query engine supports it
return true;
} else if (ci.isMajorCompaction()) {
isEnoughToCompact =
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 6a9a37dfe3f..7f690dbd6fc 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -72,6 +72,76 @@ public class TestInitiator extends CompactorTest {
startInitiator();
}
+ @Test
+ public void compactRebalance() throws Exception {
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ //Set the tresholds to reach the rebalance compaction threshold without reaching the major compaction threshold.
+ MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE, 100);
+ MetastoreConf.setDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD, 0.02);
+
+ prepareRebalanceData();
+ startInitiator();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("rebalance", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.REBALANCE, compacts.get(0).getType());
+ }
+
+ @Test
+ public void noCompactRebalanceSmallTable() throws Exception {
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+
+ prepareRebalanceData();
+ startInitiator();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(0, compacts.size());
+ }
+
+ @Test
+ public void noCompactRebalanceDataBalanced() throws Exception {
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ //Set minimum size to let initiator check, but doesn't modify rebalance threshold. No rebalance compaciton should
+ //be initiated.
+ MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE, 100);
+
+ prepareRebalanceData();
+ startInitiator();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(0, compacts.size());
+ }
+
+ private void prepareRebalanceData() throws Exception {
+ Table t = newTable("default", "rebalance", false);
+
+ addBaseFile(t, null, 200L, 200, 2, true);
+ addDeltaFile(t, null, 201L, 220L, 19, 2, false);
+
+ burnThroughTransactions("default", "rebalance", 220);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
+ comp.setTablename("rebalance");
+ comp.setOperationType(DataOperationType.UPDATE);
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ txnHandler.lock(req);
+ long writeid = allocateWriteId("default", "rebalance", txnid);
+ Assert.assertEquals(221, writeid);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+ }
+
@Test
public void recoverFailedLocalWorkers() throws Exception {
Table t = newTable("default", "rflw1", false);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 359b24bc7e6..72ed7c9ee08 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -432,6 +432,15 @@ public class MetastoreConf {
"Time after Initiator will ignore metastore.compactor.initiator.failed.compacts.threshold "
+ "and retry with compaction again. This will try to auto heal tables with previous failed compaction "
+ "without manual intervention. Setting it to 0 or negative value will disable this feature."),
+ COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE("metastore.compactor.initiator.rebalance.min.size",
+ "hive.compactor.initiator.rebalance.min.size", 1024*1024*100,
+ "Minimum table/partition size for which a rebalancing compaction can be initiated."),
+ COMPACTOR_INITIATOR_REBALANCE_THRESHOLD("metastore.compactor.initiator.rebalance.threshold",
+ "hive.compactor.initiator.rebalance.threshold", 0.2d,
+ "Threshold for the rebalancing compaction. If the std_dev/average_bucket_size (where std_dev is the " +
+ "standard deviation of the bucket sizes) is larger than the threshold, a rebalance compaction is initiated. " +
+ "In other words (assuming that the value is 0.2): If the standard deviation is larger than 20% of the average " +
+ "bucket size, a rebalancing compaction is initiated. "),
COMPACTOR_RUN_AS_USER("metastore.compactor.run.as.user", "hive.compactor.run.as.user", "",
"Specify the user to run compactor Initiator and Worker as. If empty string, defaults to table/partition " +
"directory owner."),