You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2018/10/13 23:52:11 UTC

[2/2] hive git commit: HIVE-20723: Allow per table specification of compaction yarn queue (Saurabh Seth via Eugene Koifman)

HIVE-20723: Allow per table specification of compaction yarn queue (Saurabh Seth via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/35278429
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/35278429
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/35278429

Branch: refs/heads/master
Commit: 35278429d9677b0878a4523ed7b03a5016f81e1d
Parents: 3c6a36b
Author: Saurabh Seth <sa...@gmail.com>
Authored: Sat Oct 13 16:46:16 2018 -0700
Committer: Eugene Koifman <ek...@apache.org>
Committed: Sat Oct 13 16:46:16 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/txn/compactor/TestCompactor.java      | 10 ++++++++--
 .../apache/hadoop/hive/ql/txn/compactor/CompactorMR.java | 11 ++++++-----
 2 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/35278429/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index cffa21a..a9d7468 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1584,6 +1584,7 @@ public class TestCompactor {
    */
   @Test
   public void testTableProperties() throws Exception {
+    conf.setVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE, "root.user1");
     String tblName1 = "ttp1"; // plain acid table
     String tblName2 = "ttp2"; // acid table with customized tblproperties
     executeStatementOnDriver("drop table if exists " + tblName1, driver);
@@ -1596,7 +1597,8 @@ public class TestCompactor {
       "'transactional'='true'," +
       "'compactor.mapreduce.map.memory.mb'='2048'," + // 2048 MB memory for compaction map job
       "'compactorthreshold.hive.compactor.delta.num.threshold'='4'," +  // minor compaction if more than 4 delta dirs
-      "'compactorthreshold.hive.compactor.delta.pct.threshold'='0.47'" + // major compaction if more than 47%
+      "'compactorthreshold.hive.compactor.delta.pct.threshold'='0.47'," + // major compaction if more than 47%
+      "'compactor.hive.compactor.job.queue'='root.user2'" + // Override the system wide compactor queue for this table
       ")", driver);
 
     // Insert 5 rows to both tables
@@ -1641,6 +1643,7 @@ public class TestCompactor {
     t.run();
     JobConf job = t.getMrJob();
     Assert.assertEquals(2048, job.getMemoryForMapTask());  // 2048 comes from tblproperties
+    Assert.assertEquals("root.user2", job.getQueueName()); // Queue name comes from table properties
     // Compact ttp1
     stop = new AtomicBoolean(true);
     t = new Worker();
@@ -1651,6 +1654,7 @@ public class TestCompactor {
     t.run();
     job = t.getMrJob();
     Assert.assertEquals(1024, job.getMemoryForMapTask());  // 1024 is the default value
+    Assert.assertEquals("root.user1", job.getQueueName()); // The system wide compaction queue name
     // Clean up
     runCleaner(conf);
     rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -1702,7 +1706,8 @@ public class TestCompactor {
     executeStatementOnDriver("alter table " + tblName2 + " compact 'major'" +
       " with overwrite tblproperties (" +
       "'compactor.mapreduce.map.memory.mb'='3072'," +
-      "'tblprops.orc.compress.size'='3141')", driver);
+      "'tblprops.orc.compress.size'='3141'," +
+      "'compactor.hive.compactor.job.queue'='root.user2')", driver);
 
     rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(4, rsp.getCompacts().size());
@@ -1722,6 +1727,7 @@ public class TestCompactor {
     job = t.getMrJob();
     Assert.assertEquals(3072, job.getMemoryForMapTask());
     Assert.assertTrue(job.get("hive.compactor.table.props").contains("orc.compress.size4:3141"));
+    Assert.assertEquals("root.user2", job.getQueueName());
     /*createReader(FileSystem fs, Path path) throws IOException {
      */
     //we just ran Major compaction so we should have a base_x in tblName2 that has the new files

http://git-wip-us.apache.org/repos/asf/hive/blob/35278429/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 611f85a..92c74e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -150,11 +150,6 @@ public class CompactorMR {
     job.setOutputFormat(NullOutputFormat.class);
     job.setOutputCommitter(CompactorOutputCommitter.class);
 
-    String queueName = conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE);
-    if(queueName != null && queueName.length() > 0) {
-      job.setQueueName(queueName);
-    }
-
     job.set(FINAL_LOCATION, sd.getLocation());
     job.set(TMP_LOCATION, generateTmpPath(sd));
     job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat());
@@ -167,6 +162,12 @@ public class CompactorMR {
     if (ci.properties != null) {
       overrideTblProps(job, t.getParameters(), ci.properties);
     }
+
+    String queueName = HiveConf.getVar(job, ConfVars.COMPACTOR_JOB_QUEUE);
+    if (queueName != null && queueName.length() > 0) {
+      job.setQueueName(queueName);
+    }
+
     setColumnTypes(job, sd.getCols());
     //with feature on, multiple tasks may get into conflict creating/using TMP_LOCATION and if we were
     //to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter