You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by za...@apache.org on 2022/02/18 10:42:05 UTC

[hive] branch master updated: HIVE-25947: Compactor job queue cannot be set per table via compactor.mapred.job.queue.name (Stamatis Zampetakis, reviewed by Alessandro Solimando, Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

zabetak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 03ac88f  HIVE-25947: Compactor job queue cannot be set per table via compactor.mapred.job.queue.name (Stamatis Zampetakis, reviewed by Alessandro Solimando, Denys Kuzmenko)
03ac88f is described below

commit 03ac88f3fc619fbc521b256c61b887dd2e291d60
Author: Stamatis Zampetakis <za...@gmail.com>
AuthorDate: Mon Jan 31 11:45:17 2022 +0100

    HIVE-25947: Compactor job queue cannot be set per table via compactor.mapred.job.queue.name (Stamatis Zampetakis, reviewed by Alessandro Solimando, Denys Kuzmenko)
    
    Adapt the MR compactor to accept all the properties below:
    * compactor.mapred.job.queue.name
    * compactor.mapreduce.job.queuename
    * compactor.hive.compactor.job.queue
    
    for specifying the job queue per table and per compaction. The change
    restores backward compatibility and also enables the use of the non
    deprecated MR properties.
    
    Add unit tests defining and guarding the precedence among the
    aforementioned properties and the different granularity at which a
    queue can be defined.
    
    Closes #3027
---
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  |   4 +-
 .../hive/ql/txn/compactor/CompactorUtil.java       |  45 ++--
 .../TestCompactorMRJobQueueConfiguration.java      | 262 +++++++++++++++++++++
 3 files changed, 292 insertions(+), 19 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 01fdffa..64a6e4f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -147,8 +147,8 @@ public class CompactorMR {
       overrideTblProps(job, t.getParameters(), ci.properties);
     }
 
-    String queueName = HiveConf.getVar(job, ConfVars.COMPACTOR_JOB_QUEUE);
-    if (queueName != null && queueName.length() > 0) {
+    String queueName = CompactorUtil.getCompactorJobQueueName(conf, ci, t);
+    if (!queueName.isEmpty()) {
       job.setQueueName(queueName);
     }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
index 3644f9e..43781c1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
@@ -22,16 +22,29 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.function.Function;
 
 import static java.lang.String.format;
 
 public class CompactorUtil {
   public static final String COMPACTOR = "compactor";
-  static final String COMPACTOR_PREFIX = "compactor.";
-  static final String MAPRED_QUEUE_NAME = "mapred.job.queue.name";
+  /**
+   * List of accepted properties for defining the compactor's job queue.
+   *
+   * The order is important and defines which property has precedence over the other if multiple properties are defined
+   * at the same time.
+   */
+  private static final List<String> QUEUE_PROPERTIES = Arrays.asList(
+      "compactor." + HiveConf.ConfVars.COMPACTOR_JOB_QUEUE.varname,
+      "compactor.mapreduce.job.queuename",
+      "compactor.mapred.job.queue.name"
+  );
 
   public interface ThrowingRunnable<E extends Exception> {
     void run() throws E;
@@ -62,31 +75,29 @@ public class CompactorUtil {
    * @param conf global hive conf
    * @param ci compaction info object
    * @param table instance of table
-   * @return name of the queue, can be null
+   * @return name of the queue
    */
   static String getCompactorJobQueueName(HiveConf conf, CompactionInfo ci, Table table) {
     // Get queue name from the ci. This is passed through
     // ALTER TABLE table_name COMPACT 'major' WITH OVERWRITE TBLPROPERTIES('compactor.hive.compactor.job.queue'='some_queue')
+    List<Function<String, String>> propertyGetters = new ArrayList<>(2);
     if (ci.properties != null) {
       StringableMap ciProperties = new StringableMap(ci.properties);
-      String queueName = ciProperties.get(COMPACTOR_PREFIX + MAPRED_QUEUE_NAME);
-      if (queueName != null && queueName.length() > 0) {
-        return queueName;
-      }
+      propertyGetters.add(ciProperties::get);
     }
-
-    // Get queue name from the table properties
-    String queueName = table.getParameters().get(COMPACTOR_PREFIX + MAPRED_QUEUE_NAME);
-    if (queueName != null && queueName.length() > 0) {
-      return queueName;
+    if (table.getParameters() != null) {
+      propertyGetters.add(table.getParameters()::get);
     }
 
-    // Get queue name from global hive conf
-    queueName = conf.get(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE.varname);
-    if (queueName != null && queueName.length() > 0) {
-      return queueName;
+    for (Function<String, String> getter : propertyGetters) {
+      for (String p : QUEUE_PROPERTIES) {
+        String queueName = getter.apply(p);
+        if (queueName != null && !queueName.isEmpty()) {
+          return queueName;
+        }
+      }
     }
-    return null;
+    return conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE);
   }
 
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorMRJobQueueConfiguration.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorMRJobQueueConfiguration.java
new file mode 100644
index 0000000..17b200325
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorMRJobQueueConfiguration.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.StringableMap;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for {@link CompactorMR#createBaseJobConf(HiveConf, String, Table, StorageDescriptor, ValidWriteIdList, CompactionInfo)}.
+ */
+public class TestCompactorMRJobQueueConfiguration {
+
+  @ParameterizedTest
+  @MethodSource("generateBaseJobConfSetup")
+  void testCreateBaseJobConfHasCorrectJobQueue(ConfSetup input) {
+    Table tbl = createPersonTable();
+    tbl.setParameters(input.tableProperties);
+    CompactorMR compactor = new CompactorMR();
+    CompactionInfo ci = new CompactionInfo(tbl.getDbName(), tbl.getTableName(), null, CompactionType.MAJOR);
+    ci.properties = new StringableMap(input.compactionProperties).toString();
+    HiveConf conf = new HiveConf();
+    input.confProperties.forEach(conf::set);
+    JobConf c = compactor.createBaseJobConf(conf, "test-job", tbl, tbl.getSd(), new ValidReaderWriteIdList(), ci);
+    assertEquals(input.expectedQueue, c.getQueueName(), "Test failed for the following input:" + input);
+  }
+
+  private static Stream<ConfSetup> generateBaseJobConfSetup() {
+    List<ConfSetup> inputs = new ArrayList<>();
+    String mrProperty = "mapreduce.job.queuename";
+    String hiveProperty = "hive.compactor.job.queue";
+    String mrDeprecated = "mapred.job.queue.name";
+    // Use u1 to u3 values for table properties
+    String u1 = "root.user1";
+    String u2 = "root.user2";
+    String u3 = "root.user3";
+    // Use u4 to u6 values for compaction properties
+    String u4 = "root.user4";
+    String u5 = "root.user5";
+    String u6 = "root.user6";
+    // Use su1 to su3 for for global properties
+    String su1 = "superuser1";
+    String su2 = "superuser2";
+    String su3 = "superuser3";
+    // Check precedence of queue properties when set per table
+    // CREATE TABLE ... TBLPROPERTIES (...)
+    inputs.add(new ConfSetup()
+        .tableProperty(mrDeprecated, u1)
+        .tableProperty(mrProperty, u2)
+        .tableProperty(hiveProperty, u3)
+        .setExpectedQueue(u3));
+    inputs.add(new ConfSetup().tableProperty(mrDeprecated, u1).tableProperty(hiveProperty, u3).setExpectedQueue(u3));
+    inputs.add(new ConfSetup().tableProperty(mrProperty, u2).tableProperty(hiveProperty, u3).setExpectedQueue(u3));
+    inputs.add(new ConfSetup().tableProperty(mrDeprecated, u1).tableProperty(mrProperty, u2).setExpectedQueue(u2));
+    inputs.add(new ConfSetup().tableProperty(mrDeprecated, u1).setExpectedQueue(u1));
+    inputs.add(new ConfSetup().tableProperty(mrProperty, u2).setExpectedQueue(u2));
+    inputs.add(new ConfSetup().tableProperty(hiveProperty, u3).setExpectedQueue(u3));
+    // Check precedence of queue properties when set per compaction request
+    // ALTER TABLE ... COMPACT ... TBLPROPERTIES (...)
+    inputs.add(new ConfSetup()
+        .compactionProperty(mrDeprecated, u4)
+        .compactionProperty(mrProperty, u5)
+        .compactionProperty(hiveProperty, u6)
+        .setExpectedQueue(u6));
+    inputs.add(new ConfSetup()
+        .compactionProperty(mrDeprecated, u4)
+        .compactionProperty(hiveProperty, u6)
+        .setExpectedQueue(u6));
+    inputs.add(new ConfSetup()
+        .compactionProperty(mrProperty, u5)
+        .compactionProperty(hiveProperty, u6)
+        .setExpectedQueue(u6));
+    inputs.add(new ConfSetup()
+        .compactionProperty(mrDeprecated, u4)
+        .compactionProperty(mrProperty, u5)
+        .setExpectedQueue(u5));
+    inputs.add(new ConfSetup().compactionProperty(mrDeprecated, u4).setExpectedQueue(u4));
+    inputs.add(new ConfSetup().compactionProperty(mrProperty, u5).setExpectedQueue(u5));
+    inputs.add(new ConfSetup().compactionProperty(hiveProperty, u6).setExpectedQueue(u6));
+    // Check precedence of queue properties when set globally
+    inputs.add(new ConfSetup().globalProperty(hiveProperty, su1).setExpectedQueue(su1));
+    inputs.add(new ConfSetup().globalProperty(mrProperty, su2).setExpectedQueue(su2));
+    inputs.add(new ConfSetup().globalProperty(mrDeprecated, su3).setExpectedQueue(su3));
+    inputs.add(new ConfSetup()
+        .globalProperty(hiveProperty, su1)
+        .globalProperty(mrProperty, su2)
+        .globalProperty(mrDeprecated, su3)
+        .setExpectedQueue(su1));
+    inputs.add(new ConfSetup()
+        .globalProperty(mrProperty, su2)
+        .globalProperty(mrDeprecated, su3)
+        .setExpectedQueue(su2));
+    inputs.add(new ConfSetup()
+        .globalProperty(hiveProperty, su1)
+        .globalProperty(mrDeprecated, su3)
+        .setExpectedQueue(su1));
+    inputs.add(new ConfSetup().globalProperty(hiveProperty, su1).globalProperty(mrProperty, su2).setExpectedQueue(su1));
+    // Check precedence of queue properties when set per table, per compaction request, globally. The expected order is:
+    // i)   compaction request, 
+    // ii)  table properties, 
+    // iii) global conf
+    inputs.add(new ConfSetup()
+        .tableProperty(hiveProperty, u3)
+        .compactionProperty(hiveProperty, u6)
+        .globalProperty(hiveProperty, su1)
+        .setExpectedQueue(u6));
+    inputs.add(new ConfSetup()
+        .tableProperty(hiveProperty, u3)
+        .globalProperty(hiveProperty, su1)
+        .setExpectedQueue(u3));
+    inputs.add(new ConfSetup()
+        .compactionProperty(hiveProperty, u6)
+        .globalProperty(hiveProperty, su1)
+        .setExpectedQueue(u6));
+    inputs.add(new ConfSetup()
+        .tableProperty(hiveProperty, u3)
+        .compactionProperty(hiveProperty, u6)
+        .setExpectedQueue(u6));
+    // Check combination of MR properties at table/compaction level and Hive property globally.
+    inputs.add(new ConfSetup()
+        .tableProperty(mrProperty, u2)
+        .compactionProperty(mrProperty, u5)
+        .globalProperty(hiveProperty, su1)
+        .setExpectedQueue(u5));
+    inputs.add(new ConfSetup()
+        .tableProperty(mrDeprecated, u1)
+        .compactionProperty(mrDeprecated, u4)
+        .globalProperty(hiveProperty, su1)
+        .setExpectedQueue(u4));
+    inputs.add(new ConfSetup()
+        .tableProperty(mrProperty, u2)
+        .globalProperty(hiveProperty, su1)
+        .setExpectedQueue(u2));
+    inputs.add(new ConfSetup()
+        .tableProperty(mrDeprecated, u1)
+        .globalProperty(hiveProperty, su1)
+        .setExpectedQueue(u1));
+    // Check empty properties are ignored
+    inputs.add(new ConfSetup()
+        .tableProperty(hiveProperty, u3)
+        .compactionProperty(hiveProperty, "")
+        .globalProperty(hiveProperty, su1)
+        .setExpectedQueue(u3));
+    inputs.add(new ConfSetup()
+        .tableProperty(hiveProperty, "")
+        .compactionProperty(hiveProperty, "")
+        .globalProperty(hiveProperty, su1)
+        .setExpectedQueue(su1));
+    inputs.add(new ConfSetup()
+        .tableProperty(hiveProperty, "")
+        .compactionProperty(hiveProperty, "")
+        .globalProperty(hiveProperty, "")
+        .setExpectedQueue("default"));
+    return inputs.stream();
+  }
+
+  /**
+   * Creates a minimal table resembling a PERSON type (see DDL below) for testing purposes.
+   *
+   * <pre>{@code
+   * CREATE TABLE default.person (id INT, name STRING) STORED AS TEXTFILE
+   * }</pre>
+   *
+   * @return a new table representing a person.
+   */
+  private static Table createPersonTable() {
+    FieldSchema idField = new FieldSchema();
+    idField.setName("id");
+    idField.setType("int");
+    FieldSchema nameField = new FieldSchema();
+    nameField.setName("name");
+    nameField.setType("string");
+    StorageDescriptor descriptor = new StorageDescriptor();
+    descriptor.setInputFormat("org.apache.hadoop.mapred.TextInputFormat");
+    descriptor.setOutputFormat("org.apache.hadoop.mapred.TextInputFormat");
+    descriptor.setLocation("hdfs:///apps/hive/warehouse/default.db/person");
+    descriptor.setCompressed(false);
+    descriptor.setCols(Arrays.asList(idField, nameField));
+    long createTime = LocalDate.of(2022, 1, 24).atStartOfDay().toInstant(ZoneOffset.UTC).getEpochSecond();
+    Table tbl = new Table();
+    tbl.setDbName("default");
+    tbl.setTableName("person");
+    tbl.setOwner("hive");
+    tbl.setCreateTime(Math.toIntExact(createTime));
+    tbl.setLastAccessTime(Math.toIntExact(createTime));
+    tbl.setSd(descriptor);
+    tbl.setParameters(Collections.emptyMap());
+    return tbl;
+  }
+
+  /**
+   * Class for constructing and keeping property configurations for testing the compactor's job queue.
+   */
+  private static class ConfSetup {
+    private final Map<String, String> tableProperties = new HashMap<>();
+    private final Map<String, String> compactionProperties = new HashMap<>();
+    private final Map<String, String> confProperties = new HashMap<>();
+    private String expectedQueue;
+
+    ConfSetup tableProperty(String key, String value) {
+      tableProperties.put("compactor." + key, value);
+      return this;
+    }
+
+    ConfSetup compactionProperty(String key, String value) {
+      compactionProperties.put("compactor." + key, value);
+      return this;
+    }
+
+    ConfSetup globalProperty(String key, String value) {
+      confProperties.put(key, value);
+      return this;
+    }
+
+    ConfSetup setExpectedQueue(String name) {
+      expectedQueue = name;
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return "ConfSetup{" + "tableProperties=" + tableProperties + ", compactionProperties=" + compactionProperties
+          + ", confProperties=" + confProperties + '}';
+    }
+  }
+}