You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2021/10/11 12:22:41 UTC

[hive] branch master updated: HIVE-25595: Custom queue settings is not honoured by compaction StatsUpdater (#2702) (Laszlo Pinter, reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 32c9a71  HIVE-25595: Custom queue settings is not honoured by compaction StatsUpdater (#2702) (Laszlo Pinter, reviewed by Denys Kuzmenko)
32c9a71 is described below

commit 32c9a71ca3481688071fc1ba1db8685adcb2a6fd
Author: László Pintér <47...@users.noreply.github.com>
AuthorDate: Mon Oct 11 14:22:33 2021 +0200

    HIVE-25595: Custom queue settings is not honoured by compaction StatsUpdater (#2702) (Laszlo Pinter, reviewed by Denys Kuzmenko)
---
 .../hive/ql/txn/compactor/CompactorTestUtil.java   |  26 ++++-
 .../hive/ql/txn/compactor/TestCompactor.java       |   6 +-
 .../ql/txn/compactor/TestMRCompactorOnTez.java     | 108 +++++++++++++++++++++
 .../hive/ql/txn/compactor/CompactorUtil.java       |  37 +++++++
 .../hadoop/hive/ql/txn/compactor/Worker.java       |  41 +++-----
 5 files changed, 184 insertions(+), 34 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
index 05bbb74..99d17a2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
@@ -54,6 +54,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.Set;
@@ -139,11 +140,12 @@ class CompactorTestUtil {
    * @param tblName table name
    * @param compactionType major/minor
    * @param isQueryBased true, if query based compaction should be run
+   * @param properties compaction request properties
    * @param partNames partition names
    * @throws Exception compaction cannot be started.
    */
   static void runCompaction(HiveConf conf, String dbName, String tblName, CompactionType compactionType,
-      boolean isQueryBased, String... partNames) throws Exception {
+                            boolean isQueryBased, Map<String, String> properties, String... partNames) throws  Exception {
     HiveConf hiveConf = new HiveConf(conf);
     hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, isQueryBased);
     TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
@@ -151,12 +153,15 @@ class CompactorTestUtil {
     t.setThreadId((int) t.getId());
     t.setConf(hiveConf);
     t.init(new AtomicBoolean(true));
+    CompactionRequest cr = new CompactionRequest(dbName, tblName, compactionType);
+    if (properties != null) {
+      cr.setProperties(properties);
+    }
     if (partNames.length == 0) {
-      txnHandler.compact(new CompactionRequest(dbName, tblName, compactionType));
+      txnHandler.compact(cr);
       t.run();
     } else {
       for (String partName : partNames) {
-        CompactionRequest cr = new CompactionRequest(dbName, tblName, compactionType);
         cr.setPartitionname(partName);
         txnHandler.compact(cr);
         t.run();
@@ -165,6 +170,21 @@ class CompactorTestUtil {
   }
 
   /**
+   * Trigger a compaction run.
+   * @param conf hive configuration
+   * @param dbName database name
+   * @param tblName table name
+   * @param compactionType major/minor
+   * @param isQueryBased true, if query based compaction should be run
+   * @param partNames partition names
+   * @throws Exception compaction cannot be started.
+   */
+  static void runCompaction(HiveConf conf, String dbName, String tblName, CompactionType compactionType,
+      boolean isQueryBased, String... partNames) throws Exception {
+    runCompaction(conf, dbName, tblName, compactionType, isQueryBased, null, partNames);
+  }
+
+  /**
    * Trigger a compaction cleaner.
    * @param hConf hive configuration
    * @throws Exception if cleaner cannot be started.
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index c5056c5..a5b550e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -380,15 +380,17 @@ public class TestCompactor {
 
     TxnStore txnHandler = TxnUtils.getTxnStore(conf);
     CompactionInfo ci = new CompactionInfo("default", tblName, "bkt=0", CompactionType.MAJOR);
+    Table table = msClient.getTable("default", tblName);
     LOG.debug("List of stats columns before analyze Part1: " + txnHandler.findColumnsWithStats(ci));
     Worker.StatsUpdater su = Worker.StatsUpdater.init(ci, colNames, conf,
-      System.getProperty("user.name"));
+      System.getProperty("user.name"), CompactorUtil.getCompactorJobQueueName(conf, ci, table));
     su.gatherStats();//compute stats before compaction
     LOG.debug("List of stats columns after analyze Part1: " + txnHandler.findColumnsWithStats(ci));
 
     CompactionInfo ciPart2 = new CompactionInfo("default", tblName, "bkt=1", CompactionType.MAJOR);
     LOG.debug("List of stats columns before analyze Part2: " + txnHandler.findColumnsWithStats(ci));
-    su = Worker.StatsUpdater.init(ciPart2, colNames, conf, System.getProperty("user.name"));
+    su = Worker.StatsUpdater.init(ciPart2, colNames, conf, System.getProperty("user.name"),
+        CompactorUtil.getCompactorJobQueueName(conf, ciPart2, table));
     su.gatherStats();//compute stats before compaction
     LOG.debug("List of stats columns after analyze Part2: " + txnHandler.findColumnsWithStats(ci));
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMRCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMRCompactorOnTez.java
new file mode 100644
index 0000000..36154f4
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMRCompactorOnTez.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
+import static org.junit.Assert.assertEquals;
+
+public class TestMRCompactorOnTez extends CompactorOnTezTest {
+
+  @Test
+  public void testCompactorGatherStats() throws Exception{
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, true);
+    conf.setVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE, CUSTOM_COMPACTION_QUEUE);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_MR_COMPACTOR_GATHER_STATS, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+    String tmpFolder = folder.newFolder().getAbsolutePath();
+    conf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder);
+
+    String dbName = "default";
+    String tableName = "stats_comp_test";
+    List<String> colNames = Arrays.asList("a");
+
+    executeStatementOnDriver("drop table if exists " + dbName + "." + tableName, driver);
+    executeStatementOnDriver("create table " + dbName + "." + tableName +
+        " (a INT) STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+    executeStatementOnDriver("insert into " + dbName + "." + tableName + " values(1)", driver);
+
+    // Make sure we do not have statistics for this table yet
+    // Compaction generates stats only if there is any
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    executeStatementOnDriver("analyze table " + dbName + "." + tableName + " compute statistics for columns", driver);
+    executeStatementOnDriver("insert into " + dbName + "." + tableName + " values(2)", driver);
+
+    conf.setVar(HiveConf.ConfVars.PREEXECHOOKS, HiveProtoLoggingHook.class.getName());
+    // Run major compaction and cleaner
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, false);
+    conf.setVar(HiveConf.ConfVars.PREEXECHOOKS, StringUtils.EMPTY);
+
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessfulCompaction(1);
+
+    List<ColumnStatisticsObj> colStats = msClient.getTableColumnStatistics(dbName, tableName, colNames, Constants.HIVE_ENGINE);
+    assertEquals("Stats should be there", 1, colStats.size());
+    assertEquals("Value should contain new data", 2, colStats.get(0).getStatsData().getLongStats().getHighValue());
+    assertEquals("Value should contain new data", 1, colStats.get(0).getStatsData().getLongStats().getLowValue());
+
+    executeStatementOnDriver("insert into " + dbName + "." + tableName + " values(3)", driver);
+    executeStatementOnDriver("alter table " + dbName + "." + tableName + " set tblproperties('compactor.mapred.job.queue.name'='" +
+        CUSTOM_COMPACTION_QUEUE + "')", driver);
+
+    conf.setVar(HiveConf.ConfVars.PREEXECHOOKS, HiveProtoLoggingHook.class.getName());
+    // Run major compaction and cleaner
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, false);
+    conf.setVar(HiveConf.ConfVars.PREEXECHOOKS, StringUtils.EMPTY);
+
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessfulCompaction(2);
+
+    colStats = msClient.getTableColumnStatistics(dbName, tableName, colNames, Constants.HIVE_ENGINE);
+    assertEquals("Stats should be there", 1, colStats.size());
+    assertEquals("Value should contain new data", 3, colStats.get(0).getStatsData().getLongStats().getHighValue());
+    assertEquals("Value should contain new data", 1, colStats.get(0).getStatsData().getLongStats().getLowValue());
+
+    executeStatementOnDriver("insert into " + dbName + "." + tableName + " values(4)", driver);
+    conf.setVar(HiveConf.ConfVars.PREEXECHOOKS, HiveProtoLoggingHook.class.getName());
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, false,
+        Collections.singletonMap("compactor.mapred.job.queue.name", CUSTOM_COMPACTION_QUEUE));
+    conf.setVar(HiveConf.ConfVars.PREEXECHOOKS, StringUtils.EMPTY);
+
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessfulCompaction(3);
+
+    colStats = msClient.getTableColumnStatistics(dbName, tableName, colNames, Constants.HIVE_ENGINE);
+    assertEquals("Stats should be there", 1, colStats.size());
+    assertEquals("Value should contain new data", 4, colStats.get(0).getStatsData().getLongStats().getHighValue());
+    assertEquals("Value should contain new data", 1, colStats.get(0).getStatsData().getLongStats().getLowValue());
+  }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
index 28fc642..a196e2a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
@@ -18,7 +18,10 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -26,6 +29,8 @@ import java.util.concurrent.ThreadFactory;
 
 public class CompactorUtil {
   public static final String COMPACTOR = "compactor";
+  static final String COMPACTOR_PREFIX = "compactor.";
+  static final String MAPRED_QUEUE_NAME = "mapred.job.queue.name";
 
   public interface ThrowingRunnable<E extends Exception> {
     void run() throws E;
@@ -52,4 +57,36 @@ public class CompactorUtil {
   public static ExecutorService createExecutorWithThreadFactory(int threadCount, String threadNameFormat) {
     return Executors.newFixedThreadPool(threadCount, createThreadFactory(threadNameFormat));
   }
+
+  /**
+   * Get the compactor queue name if it's defined.
+   * @param conf global hive conf
+   * @param ci compaction info object
+   * @param table instance of table
+   * @return name of the queue, can be null
+   */
+  static String getCompactorJobQueueName(HiveConf conf, CompactionInfo ci, Table table) {
+    // Get queue name from the ci. This is passed through
+    // ALTER TABLE table_name COMPACT 'major' WITH OVERWRITE TBLPROPERTIES('compactor.hive.compactor.job.queue'='some_queue')
+    if (ci.properties != null) {
+      StringableMap ciProperties = new StringableMap(ci.properties);
+      String queueName = ciProperties.get(COMPACTOR_PREFIX + MAPRED_QUEUE_NAME);
+      if (queueName != null && queueName.length() > 0) {
+        return queueName;
+      }
+    }
+
+    // Get queue name from the table properties
+    String queueName = table.getParameters().get(COMPACTOR_PREFIX + MAPRED_QUEUE_NAME);
+    if (queueName != null && queueName.length() > 0) {
+      return queueName;
+    }
+
+    // Get queue name from global hive conf
+    queueName = conf.get(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE.varname);
+    if (queueName != null && queueName.length() > 0) {
+      return queueName;
+    }
+    return null;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index c7cdc95..1b8a13f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -42,11 +42,12 @@ import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
 import org.apache.hadoop.hive.metastore.txn.TxnStatus;
+import org.apache.hadoop.hive.ql.DriverUtils;
 import org.apache.hadoop.hive.ql.io.AcidDirectory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hive.common.util.Ref;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,8 +56,6 @@ import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -150,8 +149,8 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
     static final private Logger LOG = LoggerFactory.getLogger(StatsUpdater.class);
 
     public static StatsUpdater init(CompactionInfo ci, List<String> columnListForStats,
-        HiveConf conf, String userName) {
-      return new StatsUpdater(ci, columnListForStats, conf, userName);
+        HiveConf conf, String userName, String compactionQueueName) {
+      return new StatsUpdater(ci, columnListForStats, conf, userName, compactionQueueName);
     }
 
     /**
@@ -162,14 +161,16 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
     private final HiveConf conf;
     private final String userName;
     private final CompactionInfo ci;
+    private final String compactionQueueName;
 
     private StatsUpdater(CompactionInfo ci, List<String> columnListForStats,
-        HiveConf conf, String userName) {
+        HiveConf conf, String userName, String compactionQueueName) {
       this.conf = new HiveConf(conf);
       //so that Driver doesn't think it's arleady in a transaction
       this.conf.unset(ValidTxnList.VALID_TXNS_KEY);
       this.userName = userName;
       this.ci = ci;
+      this.compactionQueueName = compactionQueueName;
       if (!ci.isMajorCompaction() || columnListForStats == null || columnListForStats.isEmpty()) {
         columnList = Collections.emptyList();
         return;
@@ -215,29 +216,11 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
         sb.setLength(sb.length() - 1); //remove trailing ,
         LOG.info(ci + ": running '" + sb.toString() + "'");
         conf.setVar(HiveConf.ConfVars.METASTOREURIS,"");
-
-        //todo: use DriverUtils.runOnDriver() here
-        QueryState queryState = new QueryState.Builder().withGenerateNewQueryId(true).withHiveConf(conf).build();
-        SessionState localSession = null;
-        try (Driver d = new Driver(queryState)) {
-          if (SessionState.get() == null) {
-            localSession = new SessionState(conf);
-            SessionState.start(localSession);
-          }
-          try {
-            d.run(sb.toString());
-          } catch (CommandProcessorException e) {
-            LOG.warn(ci + ": " + sb.toString() + " failed due to: " + e);
-          }
-        } finally {
-          if (localSession != null) {
-            try {
-              localSession.close();
-            } catch (IOException ex) {
-              LOG.warn(ci + ": localSession.close() failed due to: " + ex.getMessage(), ex);
-            }
-          }
+        if (compactionQueueName != null && compactionQueueName.length() > 0) {
+          conf.set(TezConfiguration.TEZ_QUEUE_NAME, compactionQueueName);
         }
+        SessionState sessionState = DriverUtils.setUpSessionState(conf, userName, true);
+        DriverUtils.runOnDriver(conf, userName, sessionState, sb.toString());
       } catch (Throwable t) {
         LOG.error(ci + ": gatherStats(" + ci.dbname + "," + ci.tableName + "," + ci.partName +
                       ") failed due to: " + t.getMessage(), t);
@@ -515,7 +498,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
                    compactionTxn + " with compute stats set to " + computeStats);
       final StatsUpdater su = computeStats ? StatsUpdater.init(ci, msc.findColumnsWithStats(
           CompactionInfo.compactionInfoToStruct(ci)), conf,
-          runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()) : null;
+          runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner(), CompactorUtil.getCompactorJobQueueName(conf, ci, t)) : null;
 
       try {
         failCompactionIfSetForTest();