You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/08/23 07:53:43 UTC

[hive] branch master updated: HIVE-26407: Do not compute statistics if the compaction fails (Zsolt Miskolczi, reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f30c8fe1a7 HIVE-26407: Do not compute statistics if the compaction fails (Zsolt Miskolczi, reviewed by Denys Kuzmenko)
8f30c8fe1a7 is described below

commit 8f30c8fe1a762c881d3b3570898ad048a4d36c0a
Author: InvisibleProgrammer <zs...@gmail.com>
AuthorDate: Tue Aug 23 09:53:32 2022 +0200

    HIVE-26407: Do not compute statistics if the compaction fails (Zsolt Miskolczi, reviewed by Denys Kuzmenko)
    
    Closes #3489
---
 .../hive/ql/txn/compactor/TestCompactor.java       |  8 +-
 .../ql/txn/compactor/TestCrudCompactorOnTez.java   |  2 +-
 .../hadoop/hive/ql/txn/compactor/StatsUpdater.java | 88 ++++++++++++++++++++++
 .../hadoop/hive/ql/txn/compactor/Worker.java       | 67 ++--------------
 .../hadoop/hive/ql/txn/compactor/TestWorker.java   | 25 ++++++
 5 files changed, 124 insertions(+), 66 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 7f3c239b465..e8a8ab9c49d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -124,6 +124,8 @@ public class TestCompactor {
   IMetaStoreClient msClient;
   private IDriver driver;
 
+  private StatsUpdater statsUpdater = new StatsUpdater();
+
   @Before
   public void setup() throws Exception {
 
@@ -466,10 +468,10 @@ public class TestCompactor {
 
     //compute stats before compaction
     CompactionInfo ci = new CompactionInfo(dbName, tblName, "bkt=0", CompactionType.MAJOR);
-    Worker.StatsUpdater.gatherStats(ci, conf,
+    statsUpdater.gatherStats(ci, conf,
             System.getProperty("user.name"), CompactorUtil.getCompactorJobQueueName(conf, ci, table));
     ci = new CompactionInfo(dbName, tblName, "bkt=1", CompactionType.MAJOR);
-    Worker.StatsUpdater.gatherStats(ci, conf,
+    statsUpdater.gatherStats(ci, conf,
             System.getProperty("user.name"), CompactorUtil.getCompactorJobQueueName(conf, ci, table));
 
     //Check basic stats are collected
@@ -563,7 +565,7 @@ public class TestCompactor {
 
     //compute stats before compaction
     CompactionInfo ci = new CompactionInfo(dbName, tblName, null, CompactionType.MAJOR);
-    Worker.StatsUpdater.gatherStats(ci, conf,
+    statsUpdater.gatherStats(ci, conf,
             System.getProperty("user.name"), CompactorUtil.getCompactorJobQueueName(conf, ci, table));
 
     //Check basic stats are collected
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index eb9f4c4e2a8..a60a8d964dc 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -336,7 +336,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
 
     //compute stats before compaction
     CompactionInfo ci = new CompactionInfo(dbName, tblName, null, CompactionType.MAJOR);
-    Worker.StatsUpdater.gatherStats(ci, conf,
+    new StatsUpdater().gatherStats(ci, conf,
             System.getProperty("user.name"), CompactorUtil.getCompactorJobQueueName(conf, ci, table));
 
     //Check basic stats are collected
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java
new file mode 100644
index 00000000000..1c4964be3a5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ *  Updates table/partition statistics.
+ *  Intended to run after a successful compaction.
+ */
+public final class StatsUpdater {
+    private static final Logger LOG = LoggerFactory.getLogger(StatsUpdater.class);
+    /**
+     * This doesn't throw any exceptions because we don't want the Compaction to appear as failed
+     * if stats gathering fails since this prevents Cleaner from doing it's job and if there are
+     * multiple failures, auto initiated compactions will stop which leads to problems that are
+     * much worse than stale stats.
+     *
+     * todo: longer term we should write something COMPACTION_QUEUE.CQ_META_INFO.  This is a binary
+     * field so need to figure out the msg format and how to surface it in SHOW COMPACTIONS, etc
+     *
+     * @param ci Information about the compaction being run
+     * @param conf The hive configuration object
+     * @param userName The user to run the statistic collection with
+     * @param compactionQueueName The name of the compaction queue
+     */
+    public void gatherStats(CompactionInfo ci, HiveConf conf, String userName, String compactionQueueName) {
+        try {
+            if (!ci.isMajorCompaction()) {
+                return;
+            }
+
+            HiveConf statusUpdaterConf = new HiveConf(conf);
+            statusUpdaterConf.unset(ValidTxnList.VALID_TXNS_KEY);
+
+            //e.g. analyze table page_view partition(dt='10/15/2014',country=’US’)
+            // compute statistics for columns viewtime
+            StringBuilder sb = new StringBuilder("analyze table ")
+                    .append(StatsUtils.getFullyQualifiedTableName(ci.dbname, ci.tableName));
+            if (ci.partName != null) {
+                sb.append(" partition(");
+                Map<String, String> partitionColumnValues = Warehouse.makeEscSpecFromName(ci.partName);
+                for (Map.Entry<String, String> ent : partitionColumnValues.entrySet()) {
+                    sb.append(ent.getKey()).append("='").append(ent.getValue()).append("',");
+                }
+                sb.setLength(sb.length() - 1); //remove trailing ,
+                sb.append(")");
+            }
+            sb.append(" compute statistics");
+            LOG.info(ci + ": running '" + sb + "'");
+            statusUpdaterConf.setVar(HiveConf.ConfVars.METASTOREURIS, "");
+            if (compactionQueueName != null && compactionQueueName.length() > 0) {
+                statusUpdaterConf.set(TezConfiguration.TEZ_QUEUE_NAME, compactionQueueName);
+            }
+            SessionState sessionState = DriverUtils.setUpSessionState(statusUpdaterConf, userName, true);
+            DriverUtils.runOnDriver(statusUpdaterConf, sessionState, sb.toString(), ci.highestWriteId);
+        } catch (Throwable t) {
+            LOG.error(ci + ": gatherStats(" + ci.dbname + "," + ci.tableName + "," + ci.partName +
+                    ") failed due to: " + t.getMessage(), t);
+        }
+    }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index ea19328ef1d..f78f2329600 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -43,25 +43,20 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
 import org.apache.hadoop.hive.metastore.txn.TxnStatus;
-import org.apache.hadoop.hive.ql.DriverUtils;
 import org.apache.hadoop.hive.ql.io.AcidDirectory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hive.common.util.Ref;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 
-import javax.annotation.concurrent.ThreadSafe;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
@@ -86,6 +81,8 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
 
   private String workerName;
 
+  static StatsUpdater statsUpdater = new StatsUpdater();
+
   // TODO: this doesn't check if compaction is already running (even though Initiator does but we
   //  don't go through Initiator for user initiated compactions)
   @Override
@@ -145,62 +142,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
     setName(workerName);
   }
 
-  @VisibleForTesting
-  @ThreadSafe
-  static final class StatsUpdater {
-    private static final Logger LOG = LoggerFactory.getLogger(StatsUpdater.class);
-
-    /**
-     * This doesn't throw any exceptions because we don't want the Compaction to appear as failed
-     * if stats gathering fails since this prevents Cleaner from doing it's job and if there are
-     * multiple failures, auto initiated compactions will stop which leads to problems that are
-     * much worse than stale stats.
-     *
-     * todo: longer term we should write something COMPACTION_QUEUE.CQ_META_INFO.  This is a binary
-     * field so need to figure out the msg format and how to surface it in SHOW COMPACTIONS, etc
-     *
-     * @param ci Information about the compaction being run
-     * @param conf The hive configuration object
-     * @param userName The user to run the statistic collection with
-     * @param compactionQueueName The name of the compaction queue
-     */
-    static void gatherStats(CompactionInfo ci, HiveConf conf, String userName, String compactionQueueName) {
-      try {
-        if (!ci.isMajorCompaction()) {
-          return;
-        }
-
-        HiveConf statusUpdaterConf = new HiveConf(conf);
-        statusUpdaterConf.unset(ValidTxnList.VALID_TXNS_KEY);
-
-        //e.g. analyze table page_view partition(dt='10/15/2014',country=’US’)
-        // compute statistics for columns viewtime
-        StringBuilder sb = new StringBuilder("analyze table ")
-            .append(StatsUtils.getFullyQualifiedTableName(ci.dbname, ci.tableName));
-        if (ci.partName != null) {
-          sb.append(" partition(");
-          Map<String, String> partitionColumnValues = Warehouse.makeEscSpecFromName(ci.partName);
-          for (Map.Entry<String, String> ent : partitionColumnValues.entrySet()) {
-            sb.append(ent.getKey()).append("='").append(ent.getValue()).append("',");
-          }
-          sb.setLength(sb.length() - 1); //remove trailing ,
-          sb.append(")");
-        }
-        sb.append(" compute statistics");
-        LOG.info(ci + ": running '" + sb + "'");
-        statusUpdaterConf.setVar(HiveConf.ConfVars.METASTOREURIS,"");
-        if (compactionQueueName != null && compactionQueueName.length() > 0) {
-          statusUpdaterConf.set(TezConfiguration.TEZ_QUEUE_NAME, compactionQueueName);
-        }
-        SessionState sessionState = DriverUtils.setUpSessionState(statusUpdaterConf, userName, true);
-        DriverUtils.runOnDriver(statusUpdaterConf, sessionState, sb.toString(), ci.highestWriteId);
-      } catch (Throwable t) {
-        LOG.error(ci + ": gatherStats(" + ci.dbname + "," + ci.tableName + "," + ci.partName +
-                      ") failed due to: " + t.getMessage(), t);
-      }
-    }
-  }
-
   /**
    * Determine if compaction can run in a specified directory.
    * @param isMajorCompaction type of compaction.
@@ -480,6 +421,8 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
         final CompactionType ctype = ci.type;
         markFailed(ci, e.getMessage());
 
+        computeStats = false;
+
         if (runJobAsSelf(ci.runAs)) {
           cleanupResultDirs(sd, tblValidWriteIds, ctype, dir);
         } else {
@@ -516,7 +459,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
     }
 
     if (computeStats) {
-      StatsUpdater.gatherStats(ci, conf, runJobAsSelf(ci.runAs) ? ci.runAs : t1.getOwner(),
+       statsUpdater.gatherStats(ci, conf, runJobAsSelf(ci.runAs) ? ci.runAs : t1.getOwner(),
               CompactorUtil.getCompactorJobQueueName(conf, ci, t1));
     }
     return true;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 17920fd08f9..f5b1389d3bf 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -1026,6 +1026,31 @@ public class TestWorker extends CompactorTest {
     verify(msc, times(0)).markFailed(any());
   }
 
+  @Test
+  public void testDoesNotGatherStatsIfCompactionFails() throws Exception {
+    StatsUpdater statsUpdater = Mockito.mock(StatsUpdater.class);
+
+    Table t = newTable("default", "mtwb", false);
+
+    addBaseFile(t, null, 20L, 20);
+    addDeltaFile(t, null, 21L, 22L, 2);
+    addDeltaFile(t, null, 23L, 24L, 2);
+
+    burnThroughTransactions("default", "mtwb", 25);
+
+    txnHandler.compact(new CompactionRequest("default", "mtwb", CompactionType.MINOR));
+
+    Worker worker = Mockito.spy(new Worker());
+    Mockito.when(worker.getMrCompactor()).thenThrow(RuntimeException.class);
+    worker.setConf(conf);
+    worker.init(new AtomicBoolean(true));
+    Worker.statsUpdater = statsUpdater;
+
+    worker.findNextCompactionAndExecute(true, true);
+
+    Mockito.verify(statsUpdater, Mockito.never()).gatherStats(any(), any(), any(), any());
+  }
+
   @Test
   public void droppedTable() throws Exception {
     Table t = newTable("default", "dt", false);