You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/12/07 02:07:30 UTC
[2/3] hive git commit: HIVE-15337 Enhance Show Compactions output
with JobId and start time for attempted state (Eugene Koifman,
reviewed by Wei Zheng)
HIVE-15337 Enhance Show Compactions output with JobId and start time for attempted state (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/252dd7e7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/252dd7e7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/252dd7e7
Branch: refs/heads/master
Commit: 252dd7e776c115bd96bc2673cfcb653ffad8d27a
Parents: e22e411
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Dec 6 16:24:03 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Dec 6 16:24:03 2016 -0800
----------------------------------------------------------------------
.../metastore/txn/CompactionTxnHandler.java | 40 ++++++++++++++++++--
.../hadoop/hive/metastore/txn/TxnHandler.java | 12 ++++--
.../hadoop/hive/metastore/txn/TxnStore.java | 7 ++++
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 16 ++++++--
.../hive/ql/plan/ShowCompactionsDesc.java | 2 +-
.../hive/ql/txn/compactor/CompactorMR.java | 13 ++++---
.../hadoop/hive/ql/txn/compactor/Worker.java | 4 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 1 +
.../queries/clientpositive/dbtxnmgr_showlocks.q | 6 +++
.../clientpositive/dbtxnmgr_showlocks.q.out | 20 ++++++++++
10 files changed, 102 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index fde1b54..545244b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -860,7 +860,8 @@ class CompactionTxnHandler extends TxnHandler {
//compactions are not happening.
ci.state = ATTEMPTED_STATE;
//this is not strictly accurate, but 'type' cannot be null.
- ci.type = CompactionType.MINOR;
+ if(ci.type == null) { ci.type = CompactionType.MINOR; }
+ ci.start = getDbTime(dbConn);
}
else {
ci.state = FAILED_STATE;
@@ -874,7 +875,7 @@ class CompactionTxnHandler extends TxnHandler {
closeStmt(pStmt);
dbConn.commit();
} catch (SQLException e) {
- LOG.error("Unable to delete from compaction queue " + e.getMessage());
+ LOG.warn("markFailed(" + ci.id + "):" + e.getMessage());
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
try {
@@ -883,7 +884,7 @@ class CompactionTxnHandler extends TxnHandler {
catch(MetaException ex) {
LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
}
- LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
+ LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e);
} finally {
close(rs, stmt, null);
close(null, pStmt, dbConn);
@@ -892,7 +893,38 @@ class CompactionTxnHandler extends TxnHandler {
markFailed(ci);
}
}
-
+ @Override
+ public void setHadoopJobId(String hadoopJobId, long id) {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id;
+ LOG.debug("Going to execute <" + s + ">");
+ int updateCount = stmt.executeUpdate(s);
+ LOG.debug("Going to commit");
+ closeStmt(stmt);
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ try {
+ checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")");
+ }
+ catch(MetaException ex) {
+ LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
+ }
+ LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e);
+ } finally {
+ close(null, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ setHadoopJobId(hadoopJobId, id);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index b0fa836..0c495ef 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1509,6 +1509,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
+ //-1 because 'null' literal doesn't work for all DBs...
"cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " +
"select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " +
"cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS";
@@ -1531,14 +1532,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
//do nothing to handle RU/D if we add another status
}
e.setWorkerid(rs.getString(6));
- e.setStart(rs.getLong(7));
+ long start = rs.getLong(7);
+ if(!rs.wasNull()) {
+ e.setStart(start);
+ }
long endTime = rs.getLong(8);
if(endTime != -1) {
e.setEndTime(endTime);
}
e.setRunAs(rs.getString(9));
e.setHadoopJobId(rs.getString(10));
- long id = rs.getLong(11);//for debugging
+ e.setId(rs.getLong(11));
response.addToCompacts(e);
}
LOG.debug("Going to rollback");
@@ -1943,12 +1947,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
sendRetrySignal = true;
} else {
- LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
+ LOG.error("Fatal error in " + caller + ". Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
}
}
else {
//make sure we know we saw an error that we don't recognize
- LOG.info("Non-retryable error: " + getMessage(e));
+ LOG.info("Non-retryable error in " + caller + " : " + getMessage(e));
}
}
finally {
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 3c06517..170280e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -387,4 +387,11 @@ public interface TxnStore {
public void releaseLocks();
}
}
+
+ /**
+ * Once a {@link java.util.concurrent.ThreadPoolExecutor.Worker} submits a job to the cluster,
+ * it calls this to update the metadata.
+ * @param id {@link CompactionInfo#id}
+ */
+ public void setHadoopJobId(String hadoopJobId, long id);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 4b39eb9..493e1b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -2740,6 +2740,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
ShowCompactResponse rsp = db.showCompactions();
// Write the results into the file
+ final String noVal = " --- ";
+
DataOutputStream os = getOutputStream(desc.getResFile());
try {
// Write a header
@@ -2756,6 +2758,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
os.writeBytes("Worker");
os.write(separator);
os.writeBytes("Start Time");
+ os.write(separator);
+ os.writeBytes("Duration(ms)");
+ os.write(separator);
+ os.writeBytes("HadoopJobId");
os.write(terminator);
if (rsp.getCompacts() != null) {
@@ -2765,16 +2771,20 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
os.writeBytes(e.getTablename());
os.write(separator);
String part = e.getPartitionname();
- os.writeBytes(part == null ? "NULL" : part);
+ os.writeBytes(part == null ? noVal : part);
os.write(separator);
os.writeBytes(e.getType().toString());
os.write(separator);
os.writeBytes(e.getState());
os.write(separator);
String wid = e.getWorkerid();
- os.writeBytes(wid == null ? "NULL" : wid);
+ os.writeBytes(wid == null ? noVal : wid);
+ os.write(separator);
+ os.writeBytes(e.isSetStart() ? Long.toString(e.getStart()) : noVal);
+ os.write(separator);
+ os.writeBytes(e.isSetEndTime() ? Long.toString(e.getEndTime() - e.getStart()) : noVal);
os.write(separator);
- os.writeBytes(Long.toString(e.getStart()));
+ os.writeBytes(e.isSetHadoopJobId() ? e.getHadoopJobId() : noVal);
os.write(terminator);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
index 94fd289..dc47a38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
@@ -28,7 +28,7 @@ public class ShowCompactionsDesc extends DDLDesc implements Serializable {
private static final long serialVersionUID = 1L;
private static final String schema = "dbname,tabname,partname,type,state,workerid," +
- "starttime#string:string:string:string:string:string:string";
+ "starttime,duration,hadoopjobid#string:string:string:string:string:string:string:string:string";
private String resFile;
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 9ac2964..2f25925 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
@@ -198,7 +199,7 @@ public class CompactorMR {
* @throws java.io.IOException if the job fails
*/
void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
- ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException {
+ ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException {
if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
@@ -232,7 +233,7 @@ public class CompactorMR {
launchCompactionJob(jobMinorCompact,
null, CompactionType.MINOR, null,
parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle),
- maxDeltastoHandle, -1, conf);
+ maxDeltastoHandle, -1, conf, txnHandler, ci.id);
}
//now recompute state since we've done minor compactions and have different 'best' set of deltas
dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
@@ -270,14 +271,15 @@ public class CompactorMR {
}
launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(),
- dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf);
+ dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, txnHandler, ci.id);
su.gatherStats();
}
private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType,
StringableList dirsToSearch,
List<AcidUtils.ParsedDelta> parsedDeltas,
- int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf) throws IOException {
+ int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf,
+ TxnStore txnHandler, long id) throws IOException {
job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR);
if(dirsToSearch == null) {
dirsToSearch = new StringableList();
@@ -308,7 +310,8 @@ public class CompactorMR {
"(current delta dirs count=" + curDirNumber +
", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]");
RunningJob rj = JobClient.runJob(job);
- LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID());
+ LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID() + " compaction ID=" + id);
+ txnHandler.setHadoopJobId(rj.getID().toString(), id);
rj.waitForCompletion();
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 666f13b..2d6cce9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -163,14 +163,14 @@ public class Worker extends CompactorThread {
launchedJob = true;
try {
if (runJobAsSelf(runAs)) {
- mr.run(conf, jobName.toString(), t, sd, txns, ci, su);
+ mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler);
} else {
UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
UserGroupInformation.getLoginUser());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- mr.run(conf, jobName.toString(), t, sd, txns, ci, su);
+ mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler);
return null;
}
});
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 49ba667..9145cf3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -309,6 +309,7 @@ public class TestTxnCommands2 {
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
+ Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
// 3. Perform a delete.
runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1");
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q b/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
index da8e448..24a42ea 100644
--- a/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
+++ b/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
@@ -21,4 +21,10 @@ show locks partitioned_acid_table partition (p='abc');
show locks partitioned_acid_table partition (p='abc') extended;
+insert into partitioned_acid_table partition(p='abc') values(1,2);
+
+alter table partitioned_acid_table partition(p='abc') compact 'minor';
+
+show compactions;
+
drop table partitioned_acid_table;
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
index 3b683f8..4da2c87 100644
--- a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
+++ b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
@@ -51,6 +51,26 @@ PREHOOK: type: SHOWLOCKS
POSTHOOK: query: show locks partitioned_acid_table partition (p='abc') extended
POSTHOOK: type: SHOWLOCKS
Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info
+PREHOOK: query: insert into partitioned_acid_table partition(p='abc') values(1,2)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@partitioned_acid_table@p=abc
+POSTHOOK: query: insert into partitioned_acid_table partition(p='abc') values(1,2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@partitioned_acid_table@p=abc
+POSTHOOK: Lineage: partitioned_acid_table PARTITION(p=abc).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: partitioned_acid_table PARTITION(p=abc).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: alter table partitioned_acid_table partition(p='abc') compact 'minor'
+PREHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: query: alter table partitioned_acid_table partition(p='abc') compact 'minor'
+POSTHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: query: show compactions
+PREHOOK: type: SHOW COMPACTIONS
+POSTHOOK: query: show compactions
+POSTHOOK: type: SHOW COMPACTIONS
+Database Table Partition Type State Worker Start Time Duration(ms) HadoopJobId
+default partitioned_acid_table p=abc MINOR initiated --- --- --- ---
PREHOOK: query: drop table partitioned_acid_table
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@partitioned_acid_table