You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2015/10/02 19:39:16 UTC
[1/2] hive git commit: HIVE-11997 - Add ability to send Compaction
Jobs to specific queue (Eugene Koifman, reviewed by Jason Dere)
Repository: hive
Updated Branches:
refs/heads/master ff9822eb3 -> a1bac802a
HIVE-11997 - Add ability to send Compaction Jobs to specific queue (Eugene Koifman, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b1eb0c0f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b1eb0c0f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b1eb0c0f
Branch: refs/heads/master
Commit: b1eb0c0f1c3fc0f503bc675281c8be8356d1f081
Parents: ff9822e
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Oct 2 10:11:00 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Oct 2 10:11:00 2015 -0700
----------------------------------------------------------------------
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java | 2 ++
.../apache/hadoop/hive/ql/txn/compactor/CompactorMR.java | 10 +++++++++-
2 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b1eb0c0f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index dffdb5c..e7ed07e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1571,6 +1571,8 @@ public class HiveConf extends Configuration {
HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms",
new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
+ COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" +
+ "Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."),
HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s",
new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"),
HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s",
http://git-wip-us.apache.org/repos/asf/hive/blob/b1eb0c0f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 02fa725..3ee9346 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -117,6 +117,11 @@ public class CompactorMR {
job.setInputFormat(CompactorInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
job.setOutputCommitter(CompactorOutputCommitter.class);
+
+ String queueName = conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE);
+ if(queueName != null && queueName.length() > 0) {
+ job.setQueueName(queueName);
+ }
job.set(FINAL_LOCATION, sd.getLocation());
job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString());
@@ -189,7 +194,10 @@ public class CompactorMR {
LOG.debug("Setting maximume transaction to " + maxTxn);
RunningJob rj = JobClient.runJob(job);
- LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : CompactionType.MINOR) + " compaction job '" + jobName + "' with jobID=" + rj.getID());
+ LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : CompactionType.MINOR) + " compaction job '" +
+ jobName + "' with jobID=" + rj.getID() + " to " + job.getQueueName() + " queue. " +
+ "(current delta dirs count=" + dir.getCurrentDirectories().size() +
+ ", obsolete delta dirs count=" + dir.getObsolete());
rj.waitForCompletion();
su.gatherStats();
}
[2/2] hive git commit: HIVE-11998 - Improve Compaction process
logging (Eugene Koifman, reviewed by Jason Dere)
Posted by ek...@apache.org.
HIVE-11998 - Improve Compaction process logging (Eugene Koifman, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a1bac802
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a1bac802
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a1bac802
Branch: refs/heads/master
Commit: a1bac802a21efef8a2c10d616b2aeb680ffedd9c
Parents: b1eb0c0
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Oct 2 10:12:29 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Oct 2 10:12:29 2015 -0700
----------------------------------------------------------------------
.../metastore/txn/CompactionTxnHandler.java | 36 ++++++++++++++------
.../hive/ql/txn/AcidHouseKeeperService.java | 5 +--
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 3 +-
.../hive/ql/txn/compactor/CompactorThread.java | 9 ++---
.../hadoop/hive/ql/txn/compactor/Initiator.java | 5 ++-
5 files changed, 38 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a1bac802/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 328a65c..44ee5c6 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -122,8 +122,9 @@ public class CompactionTxnHandler extends TxnHandler {
stmt = dbConn.createStatement();
String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
LOG.debug("Going to execute update <" + s + ">");
- if (stmt.executeUpdate(s) != 1) {
- LOG.error("Unable to update compaction record");
+ int updCnt = stmt.executeUpdate(s);
+ if (updCnt != 1) {
+ LOG.error("Unable to set cq_run_as=" + user + " for compaction record with cq_id=" + cq_id + ". updCnt=" + updCnt);
LOG.debug("Going to rollback");
dbConn.rollback();
}
@@ -182,8 +183,10 @@ public class CompactionTxnHandler extends TxnHandler {
s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
"cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id;
LOG.debug("Going to execute update <" + s + ">");
- if (stmt.executeUpdate(s) != 1) {
- LOG.error("Unable to update compaction record");
+ int updCount = stmt.executeUpdate(s);
+ if (updCount != 1) {
+ LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " +
+ info + ". updCnt=" + updCount);
LOG.debug("Going to rollback");
dbConn.rollback();
}
@@ -221,8 +224,9 @@ public class CompactionTxnHandler extends TxnHandler {
String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " +
"cq_worker_id = null where cq_id = " + info.id;
LOG.debug("Going to execute update <" + s + ">");
- if (stmt.executeUpdate(s) != 1) {
- LOG.error("Unable to update compaction record");
+ int updCnt = stmt.executeUpdate(s);
+ if (updCnt != 1) {
+ LOG.error("Unable to set cq_state=" + READY_FOR_CLEANING + " for compaction record: " + info + ". updCnt=" + updCnt);
LOG.debug("Going to rollback");
dbConn.rollback();
}
@@ -298,6 +302,17 @@ public class CompactionTxnHandler extends TxnHandler {
/**
* This will remove an entry from the queue after
* it has been compacted.
+ *
+ * todo: possibly a problem? Worker will start with DB in state X (wrt this partition).
+ * while it's working more txns will happen, against partition it's compacting.
+ * then this will delete state up to X and since then. There may be new delta files created
+ * between compaction starting and cleaning. These will not be compacted until more
+ * transactions happen. So this ideally should only delete
+ * up to TXN_ID that was compacted (i.e. HWM in Worker?) Then this can also run
+ * at READ_COMMITTED
+ *
+ * Also, by using this method when Worker fails, we prevent future compactions from
+ * running until more data is written to tale or compaction is invoked explicitly
* @param info info on the compaction entry to remove
*/
public void markCleaned(CompactionInfo info) throws MetaException {
@@ -309,8 +324,9 @@ public class CompactionTxnHandler extends TxnHandler {
stmt = dbConn.createStatement();
String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
LOG.debug("Going to execute update <" + s + ">");
- if (stmt.executeUpdate(s) != 1) {
- LOG.error("Unable to delete compaction record");
+ int updCount = stmt.executeUpdate(s);
+ if (updCount != 1) {
+ LOG.error("Unable to delete compaction record: " + info + ". Update count=" + updCount);
LOG.debug("Going to rollback");
dbConn.rollback();
}
@@ -348,7 +364,7 @@ public class CompactionTxnHandler extends TxnHandler {
else buf.append(", ");
buf.append(id);
}
-
+ //because 1 txn may include different partitions/tables even in auto commit mode
buf.append(") and tc_database = '");
buf.append(info.dbname);
buf.append("' and tc_table = '");
@@ -415,7 +431,7 @@ public class CompactionTxnHandler extends TxnHandler {
String bufStr = buf.toString();
LOG.debug("Going to execute update <" + bufStr + ">");
int rc = stmt.executeUpdate(bufStr);
- LOG.debug("Removed " + rc + " records from txns");
+ LOG.info("Removed " + rc + " empty Aborted transactions: " + txnids + " from TXNS");
LOG.debug("Going to commit");
dbConn.commit();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a1bac802/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
index d22ca8d..23a77e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
@@ -84,9 +84,10 @@ public class AcidHouseKeeperService implements HouseKeeperService {
@Override
public void run() {
try {
+ long startTime = System.currentTimeMillis();
txnHandler.performTimeOuts();
- owner.isAliveCounter.incrementAndGet();
- LOG.info("timeout reaper ran");
+ int count = owner.isAliveCounter.incrementAndGet();
+ LOG.info("timeout reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count);
}
catch(Throwable t) {
LOG.fatal("Serious error in " + Thread.currentThread().getName() + ": " + t.getMessage(), t);
http://git-wip-us.apache.org/repos/asf/hive/blob/a1bac802/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 16d2c81..622bf54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -212,7 +212,7 @@ public class Cleaner extends CompactorThread {
if (runJobAsSelf(ci.runAs)) {
removeFiles(location, txnList);
} else {
- LOG.info("Cleaning as user " + ci.runAs);
+ LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName());
UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs,
UserGroupInformation.getLoginUser());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -245,6 +245,7 @@ public class Cleaner extends CompactorThread {
", that hardly seems right.");
return;
}
+ LOG.info("About to remove " + filesToDelete.size() + " obsolete directories from " + location);
FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
for (Path dead : filesToDelete) {
http://git-wip-us.apache.org/repos/asf/hive/blob/a1bac802/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index 38cd95e..c956f58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -119,8 +119,8 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
throw e;
}
if (parts.size() != 1) {
- LOG.error(ci.getFullPartitionName() + " does not refer to a single partition");
- throw new MetaException("Too many partitions");
+ LOG.error(ci.getFullPartitionName() + " does not refer to a single partition. " + parts);
+ throw new MetaException("Too many partitions for : " + ci.getFullPartitionName());
}
return parts.get(0);
} else {
@@ -179,8 +179,9 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
return wrapper.get(0);
}
}
- LOG.error("Unable to stat file as either current user or table owner, giving up");
- throw new IOException("Unable to stat file");
+ LOG.error("Unable to stat file " + p + " as either current user(" + UserGroupInformation.getLoginUser() +
+ ") or table owner(" + t.getOwner() + "), giving up");
+ throw new IOException("Unable to stat file: " + p);
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/a1bac802/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 9bf725d..f265311 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -99,7 +99,7 @@ public class Initiator extends CompactorThread {
// check if no compaction set for this table
if (noAutoCompactSet(t)) {
- LOG.info("Table " + tableName(t) + " marked true so we will not compact it.");
+ LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + "=true so we will not compact it.");
continue;
}
@@ -297,11 +297,10 @@ public class Initiator extends CompactorThread {
}
private void requestCompaction(CompactionInfo ci, String runAs, CompactionType type) throws MetaException {
- String s = "Requesting " + type.toString() + " compaction for " + ci.getFullPartitionName();
- LOG.info(s);
CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, type);
if (ci.partName != null) rqst.setPartitionname(ci.partName);
rqst.setRunas(runAs);
+ LOG.info("Requesting compaction: " + rqst);
txnHandler.compact(rqst);
}