You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/05 21:41:21 UTC

[02/23] hive git commit: HIVE-11997 - Add ability to send Compaction Jobs to specific queue (Eugene Koifman, reviewed by Jason Dere)

HIVE-11997 - Add ability to send Compaction Jobs to specific queue (Eugene Koifman, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b1eb0c0f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b1eb0c0f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b1eb0c0f

Branch: refs/heads/llap
Commit: b1eb0c0f1c3fc0f503bc675281c8be8356d1f081
Parents: ff9822e
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Oct 2 10:11:00 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Oct 2 10:11:00 2015 -0700

----------------------------------------------------------------------
 common/src/java/org/apache/hadoop/hive/conf/HiveConf.java |  2 ++
 .../apache/hadoop/hive/ql/txn/compactor/CompactorMR.java  | 10 +++++++++-
 2 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b1eb0c0f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index dffdb5c..e7ed07e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1571,6 +1571,8 @@ public class HiveConf extends Configuration {
 
     HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms",
         new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
+    COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" +
+      "Compaction jobs will be submitted.  Set to empty string to let Hadoop choose the queue."),
     HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s",
       new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"),
     HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s",

http://git-wip-us.apache.org/repos/asf/hive/blob/b1eb0c0f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 02fa725..3ee9346 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -117,6 +117,11 @@ public class CompactorMR {
     job.setInputFormat(CompactorInputFormat.class);
     job.setOutputFormat(NullOutputFormat.class);
     job.setOutputCommitter(CompactorOutputCommitter.class);
+    
+    String queueName = conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE);
+    if(queueName != null && queueName.length() > 0) {
+      job.setQueueName(queueName);
+    }
 
     job.set(FINAL_LOCATION, sd.getLocation());
     job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString());
@@ -189,7 +194,10 @@ public class CompactorMR {
     LOG.debug("Setting maximume transaction to " + maxTxn);
 
     RunningJob rj = JobClient.runJob(job);
-    LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : CompactionType.MINOR) + " compaction job '" + jobName + "' with jobID=" + rj.getID());
+    LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : CompactionType.MINOR) + " compaction job '" +
+      jobName + "' with jobID=" + rj.getID() + " to " + job.getQueueName() + " queue.  " +
+      "(current delta dirs count=" + dir.getCurrentDirectories().size() +
+      ", obsolete delta dirs count=" + dir.getObsolete());
     rj.waitForCompletion();
     su.gatherStats();
   }