You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/08/23 07:53:43 UTC
[hive] branch master updated: HIVE-26407: Do not compute statistics if the compaction fails (Zsolt Miskolczi, reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 8f30c8fe1a7 HIVE-26407: Do not compute statistics if the compaction fails (Zsolt Miskolczi, reviewed by Denys Kuzmenko)
8f30c8fe1a7 is described below
commit 8f30c8fe1a762c881d3b3570898ad048a4d36c0a
Author: InvisibleProgrammer <zs...@gmail.com>
AuthorDate: Tue Aug 23 09:53:32 2022 +0200
HIVE-26407: Do not compute statistics if the compaction fails (Zsolt Miskolczi, reviewed by Denys Kuzmenko)
Closes #3489
---
.../hive/ql/txn/compactor/TestCompactor.java | 8 +-
.../ql/txn/compactor/TestCrudCompactorOnTez.java | 2 +-
.../hadoop/hive/ql/txn/compactor/StatsUpdater.java | 88 ++++++++++++++++++++++
.../hadoop/hive/ql/txn/compactor/Worker.java | 67 ++--------------
.../hadoop/hive/ql/txn/compactor/TestWorker.java | 25 ++++++
5 files changed, 124 insertions(+), 66 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 7f3c239b465..e8a8ab9c49d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -124,6 +124,8 @@ public class TestCompactor {
IMetaStoreClient msClient;
private IDriver driver;
+ private StatsUpdater statsUpdater = new StatsUpdater();
+
@Before
public void setup() throws Exception {
@@ -466,10 +468,10 @@ public class TestCompactor {
//compute stats before compaction
CompactionInfo ci = new CompactionInfo(dbName, tblName, "bkt=0", CompactionType.MAJOR);
- Worker.StatsUpdater.gatherStats(ci, conf,
+ statsUpdater.gatherStats(ci, conf,
System.getProperty("user.name"), CompactorUtil.getCompactorJobQueueName(conf, ci, table));
ci = new CompactionInfo(dbName, tblName, "bkt=1", CompactionType.MAJOR);
- Worker.StatsUpdater.gatherStats(ci, conf,
+ statsUpdater.gatherStats(ci, conf,
System.getProperty("user.name"), CompactorUtil.getCompactorJobQueueName(conf, ci, table));
//Check basic stats are collected
@@ -563,7 +565,7 @@ public class TestCompactor {
//compute stats before compaction
CompactionInfo ci = new CompactionInfo(dbName, tblName, null, CompactionType.MAJOR);
- Worker.StatsUpdater.gatherStats(ci, conf,
+ statsUpdater.gatherStats(ci, conf,
System.getProperty("user.name"), CompactorUtil.getCompactorJobQueueName(conf, ci, table));
//Check basic stats are collected
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index eb9f4c4e2a8..a60a8d964dc 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -336,7 +336,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
//compute stats before compaction
CompactionInfo ci = new CompactionInfo(dbName, tblName, null, CompactionType.MAJOR);
- Worker.StatsUpdater.gatherStats(ci, conf,
+ new StatsUpdater().gatherStats(ci, conf,
System.getProperty("user.name"), CompactorUtil.getCompactorJobQueueName(conf, ci, table));
//Check basic stats are collected
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java
new file mode 100644
index 00000000000..1c4964be3a5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Updates table/partition statistics.
+ * Intended to run after a successful compaction.
+ */
+public final class StatsUpdater {
+ private static final Logger LOG = LoggerFactory.getLogger(StatsUpdater.class);
+ /**
+ * This doesn't throw any exceptions because we don't want the Compaction to appear as failed
+ * if stats gathering fails since this prevents Cleaner from doing it's job and if there are
+ * multiple failures, auto initiated compactions will stop which leads to problems that are
+ * much worse than stale stats.
+ *
+ * todo: longer term we should write something COMPACTION_QUEUE.CQ_META_INFO. This is a binary
+ * field so need to figure out the msg format and how to surface it in SHOW COMPACTIONS, etc
+ *
+ * @param ci Information about the compaction being run
+ * @param conf The hive configuration object
+ * @param userName The user to run the statistic collection with
+ * @param compactionQueueName The name of the compaction queue
+ */
+ public void gatherStats(CompactionInfo ci, HiveConf conf, String userName, String compactionQueueName) {
+ try {
+ if (!ci.isMajorCompaction()) {
+ return;
+ }
+
+ HiveConf statusUpdaterConf = new HiveConf(conf);
+ statusUpdaterConf.unset(ValidTxnList.VALID_TXNS_KEY);
+
+ //e.g. analyze table page_view partition(dt='10/15/2014',country=’US’)
+ // compute statistics for columns viewtime
+ StringBuilder sb = new StringBuilder("analyze table ")
+ .append(StatsUtils.getFullyQualifiedTableName(ci.dbname, ci.tableName));
+ if (ci.partName != null) {
+ sb.append(" partition(");
+ Map<String, String> partitionColumnValues = Warehouse.makeEscSpecFromName(ci.partName);
+ for (Map.Entry<String, String> ent : partitionColumnValues.entrySet()) {
+ sb.append(ent.getKey()).append("='").append(ent.getValue()).append("',");
+ }
+ sb.setLength(sb.length() - 1); //remove trailing ,
+ sb.append(")");
+ }
+ sb.append(" compute statistics");
+ LOG.info(ci + ": running '" + sb + "'");
+ statusUpdaterConf.setVar(HiveConf.ConfVars.METASTOREURIS, "");
+ if (compactionQueueName != null && compactionQueueName.length() > 0) {
+ statusUpdaterConf.set(TezConfiguration.TEZ_QUEUE_NAME, compactionQueueName);
+ }
+ SessionState sessionState = DriverUtils.setUpSessionState(statusUpdaterConf, userName, true);
+ DriverUtils.runOnDriver(statusUpdaterConf, sessionState, sb.toString(), ci.highestWriteId);
+ } catch (Throwable t) {
+ LOG.error(ci + ": gatherStats(" + ci.dbname + "," + ci.tableName + "," + ci.partName +
+ ") failed due to: " + t.getMessage(), t);
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index ea19328ef1d..f78f2329600 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -43,25 +43,20 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.txn.TxnStatus;
-import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hive.common.util.Ref;
-import org.apache.tez.dag.api.TezConfiguration;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hadoop.security.UserGroupInformation;
-import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
@@ -86,6 +81,8 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
private String workerName;
+ static StatsUpdater statsUpdater = new StatsUpdater();
+
// TODO: this doesn't check if compaction is already running (even though Initiator does but we
// don't go through Initiator for user initiated compactions)
@Override
@@ -145,62 +142,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
setName(workerName);
}
- @VisibleForTesting
- @ThreadSafe
- static final class StatsUpdater {
- private static final Logger LOG = LoggerFactory.getLogger(StatsUpdater.class);
-
- /**
- * This doesn't throw any exceptions because we don't want the Compaction to appear as failed
- * if stats gathering fails since this prevents Cleaner from doing it's job and if there are
- * multiple failures, auto initiated compactions will stop which leads to problems that are
- * much worse than stale stats.
- *
- * todo: longer term we should write something COMPACTION_QUEUE.CQ_META_INFO. This is a binary
- * field so need to figure out the msg format and how to surface it in SHOW COMPACTIONS, etc
- *
- * @param ci Information about the compaction being run
- * @param conf The hive configuration object
- * @param userName The user to run the statistic collection with
- * @param compactionQueueName The name of the compaction queue
- */
- static void gatherStats(CompactionInfo ci, HiveConf conf, String userName, String compactionQueueName) {
- try {
- if (!ci.isMajorCompaction()) {
- return;
- }
-
- HiveConf statusUpdaterConf = new HiveConf(conf);
- statusUpdaterConf.unset(ValidTxnList.VALID_TXNS_KEY);
-
- //e.g. analyze table page_view partition(dt='10/15/2014',country=’US’)
- // compute statistics for columns viewtime
- StringBuilder sb = new StringBuilder("analyze table ")
- .append(StatsUtils.getFullyQualifiedTableName(ci.dbname, ci.tableName));
- if (ci.partName != null) {
- sb.append(" partition(");
- Map<String, String> partitionColumnValues = Warehouse.makeEscSpecFromName(ci.partName);
- for (Map.Entry<String, String> ent : partitionColumnValues.entrySet()) {
- sb.append(ent.getKey()).append("='").append(ent.getValue()).append("',");
- }
- sb.setLength(sb.length() - 1); //remove trailing ,
- sb.append(")");
- }
- sb.append(" compute statistics");
- LOG.info(ci + ": running '" + sb + "'");
- statusUpdaterConf.setVar(HiveConf.ConfVars.METASTOREURIS,"");
- if (compactionQueueName != null && compactionQueueName.length() > 0) {
- statusUpdaterConf.set(TezConfiguration.TEZ_QUEUE_NAME, compactionQueueName);
- }
- SessionState sessionState = DriverUtils.setUpSessionState(statusUpdaterConf, userName, true);
- DriverUtils.runOnDriver(statusUpdaterConf, sessionState, sb.toString(), ci.highestWriteId);
- } catch (Throwable t) {
- LOG.error(ci + ": gatherStats(" + ci.dbname + "," + ci.tableName + "," + ci.partName +
- ") failed due to: " + t.getMessage(), t);
- }
- }
- }
-
/**
* Determine if compaction can run in a specified directory.
* @param isMajorCompaction type of compaction.
@@ -480,6 +421,8 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
final CompactionType ctype = ci.type;
markFailed(ci, e.getMessage());
+ computeStats = false;
+
if (runJobAsSelf(ci.runAs)) {
cleanupResultDirs(sd, tblValidWriteIds, ctype, dir);
} else {
@@ -516,7 +459,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
}
if (computeStats) {
- StatsUpdater.gatherStats(ci, conf, runJobAsSelf(ci.runAs) ? ci.runAs : t1.getOwner(),
+ statsUpdater.gatherStats(ci, conf, runJobAsSelf(ci.runAs) ? ci.runAs : t1.getOwner(),
CompactorUtil.getCompactorJobQueueName(conf, ci, t1));
}
return true;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 17920fd08f9..f5b1389d3bf 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -1026,6 +1026,31 @@ public class TestWorker extends CompactorTest {
verify(msc, times(0)).markFailed(any());
}
+ @Test
+ public void testDoesNotGatherStatsIfCompactionFails() throws Exception {
+ StatsUpdater statsUpdater = Mockito.mock(StatsUpdater.class);
+
+ Table t = newTable("default", "mtwb", false);
+
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+
+ burnThroughTransactions("default", "mtwb", 25);
+
+ txnHandler.compact(new CompactionRequest("default", "mtwb", CompactionType.MINOR));
+
+ Worker worker = Mockito.spy(new Worker());
+ Mockito.when(worker.getMrCompactor()).thenThrow(RuntimeException.class);
+ worker.setConf(conf);
+ worker.init(new AtomicBoolean(true));
+ Worker.statsUpdater = statsUpdater;
+
+ worker.findNextCompactionAndExecute(true, true);
+
+ Mockito.verify(statsUpdater, Mockito.never()).gatherStats(any(), any(), any(), any());
+ }
+
@Test
public void droppedTable() throws Exception {
Table t = newTable("default", "dt", false);