You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/12/07 02:07:29 UTC
[1/3] hive git commit: HIVE-14895
CompactorMR.CompactorOutputCommitter race condition (Eugene Koifman,
reviewed by Wei Zheng)
Repository: hive
Updated Branches:
refs/heads/master c42bb86b8 -> 2a24612be
HIVE-14895 CompactorMR.CompactorOutputCommitter race condition (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e22e4112
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e22e4112
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e22e4112
Branch: refs/heads/master
Commit: e22e411254dd21ab7df5ae619aebe50d68a07625
Parents: c42bb86
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Dec 6 16:21:54 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Dec 6 16:21:54 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e22e4112/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index c3e3982..9ac2964 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -837,13 +837,15 @@ public class CompactorMR {
@Override
public void commitJob(JobContext context) throws IOException {
JobConf conf = ShimLoader.getHadoopShims().getJobConf(context);
- Path tmpLocation = new Path(conf.get(TMP_LOCATION));
+ Path tmpLocation = new Path(conf.get(TMP_LOCATION));//this contains base_xxx or delta_xxx_yyy
Path finalLocation = new Path(conf.get(FINAL_LOCATION));
FileSystem fs = tmpLocation.getFileSystem(conf);
LOG.debug("Moving contents of " + tmpLocation.toString() + " to " +
finalLocation.toString());
- FileStatus[] contents = fs.listStatus(tmpLocation);
+ FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list
+ //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir
+ //name is that we want to rename; leave it for another day
for (int i = 0; i < contents.length; i++) {
Path newPath = new Path(finalLocation, contents[i].getPath().getName());
fs.rename(contents[i].getPath(), newPath);
[2/3] hive git commit: HIVE-15337 Enhance Show Compactions output
with JobId and start time for attempted state (Eugene Koifman,
reviewed by Wei Zheng)
Posted by ek...@apache.org.
HIVE-15337 Enhance Show Compactions output with JobId and start time for attempted state (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/252dd7e7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/252dd7e7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/252dd7e7
Branch: refs/heads/master
Commit: 252dd7e776c115bd96bc2673cfcb653ffad8d27a
Parents: e22e411
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Dec 6 16:24:03 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Dec 6 16:24:03 2016 -0800
----------------------------------------------------------------------
.../metastore/txn/CompactionTxnHandler.java | 40 ++++++++++++++++++--
.../hadoop/hive/metastore/txn/TxnHandler.java | 12 ++++--
.../hadoop/hive/metastore/txn/TxnStore.java | 7 ++++
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 16 ++++++--
.../hive/ql/plan/ShowCompactionsDesc.java | 2 +-
.../hive/ql/txn/compactor/CompactorMR.java | 13 ++++---
.../hadoop/hive/ql/txn/compactor/Worker.java | 4 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 1 +
.../queries/clientpositive/dbtxnmgr_showlocks.q | 6 +++
.../clientpositive/dbtxnmgr_showlocks.q.out | 20 ++++++++++
10 files changed, 102 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index fde1b54..545244b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -860,7 +860,8 @@ class CompactionTxnHandler extends TxnHandler {
//compactions are not happening.
ci.state = ATTEMPTED_STATE;
//this is not strictly accurate, but 'type' cannot be null.
- ci.type = CompactionType.MINOR;
+ if(ci.type == null) { ci.type = CompactionType.MINOR; }
+ ci.start = getDbTime(dbConn);
}
else {
ci.state = FAILED_STATE;
@@ -874,7 +875,7 @@ class CompactionTxnHandler extends TxnHandler {
closeStmt(pStmt);
dbConn.commit();
} catch (SQLException e) {
- LOG.error("Unable to delete from compaction queue " + e.getMessage());
+ LOG.warn("markFailed(" + ci.id + "):" + e.getMessage());
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
try {
@@ -883,7 +884,7 @@ class CompactionTxnHandler extends TxnHandler {
catch(MetaException ex) {
LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
}
- LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
+ LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e);
} finally {
close(rs, stmt, null);
close(null, pStmt, dbConn);
@@ -892,7 +893,38 @@ class CompactionTxnHandler extends TxnHandler {
markFailed(ci);
}
}
-
+ @Override
+ public void setHadoopJobId(String hadoopJobId, long id) {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id;
+ LOG.debug("Going to execute <" + s + ">");
+ int updateCount = stmt.executeUpdate(s);
+ LOG.debug("Going to commit");
+ closeStmt(stmt);
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ try {
+ checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")");
+ }
+ catch(MetaException ex) {
+ LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
+ }
+ LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e);
+ } finally {
+ close(null, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ setHadoopJobId(hadoopJobId, id);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index b0fa836..0c495ef 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1509,6 +1509,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
+ //-1 because 'null' literal doesn't work for all DBs...
"cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " +
"select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " +
"cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS";
@@ -1531,14 +1532,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
//do nothing to handle RU/D if we add another status
}
e.setWorkerid(rs.getString(6));
- e.setStart(rs.getLong(7));
+ long start = rs.getLong(7);
+ if(!rs.wasNull()) {
+ e.setStart(start);
+ }
long endTime = rs.getLong(8);
if(endTime != -1) {
e.setEndTime(endTime);
}
e.setRunAs(rs.getString(9));
e.setHadoopJobId(rs.getString(10));
- long id = rs.getLong(11);//for debugging
+ e.setId(rs.getLong(11));
response.addToCompacts(e);
}
LOG.debug("Going to rollback");
@@ -1943,12 +1947,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
sendRetrySignal = true;
} else {
- LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
+ LOG.error("Fatal error in " + caller + ". Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
}
}
else {
//make sure we know we saw an error that we don't recognize
- LOG.info("Non-retryable error: " + getMessage(e));
+ LOG.info("Non-retryable error in " + caller + " : " + getMessage(e));
}
}
finally {
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 3c06517..170280e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -387,4 +387,11 @@ public interface TxnStore {
public void releaseLocks();
}
}
+
+ /**
+ * Once a {@link java.util.concurrent.ThreadPoolExecutor.Worker} submits a job to the cluster,
+ * it calls this to update the metadata.
+ * @param id {@link CompactionInfo#id}
+ */
+ public void setHadoopJobId(String hadoopJobId, long id);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 4b39eb9..493e1b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -2740,6 +2740,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
ShowCompactResponse rsp = db.showCompactions();
// Write the results into the file
+ final String noVal = " --- ";
+
DataOutputStream os = getOutputStream(desc.getResFile());
try {
// Write a header
@@ -2756,6 +2758,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
os.writeBytes("Worker");
os.write(separator);
os.writeBytes("Start Time");
+ os.write(separator);
+ os.writeBytes("Duration(ms)");
+ os.write(separator);
+ os.writeBytes("HadoopJobId");
os.write(terminator);
if (rsp.getCompacts() != null) {
@@ -2765,16 +2771,20 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
os.writeBytes(e.getTablename());
os.write(separator);
String part = e.getPartitionname();
- os.writeBytes(part == null ? "NULL" : part);
+ os.writeBytes(part == null ? noVal : part);
os.write(separator);
os.writeBytes(e.getType().toString());
os.write(separator);
os.writeBytes(e.getState());
os.write(separator);
String wid = e.getWorkerid();
- os.writeBytes(wid == null ? "NULL" : wid);
+ os.writeBytes(wid == null ? noVal : wid);
+ os.write(separator);
+ os.writeBytes(e.isSetStart() ? Long.toString(e.getStart()) : noVal);
+ os.write(separator);
+ os.writeBytes(e.isSetEndTime() ? Long.toString(e.getEndTime() - e.getStart()) : noVal);
os.write(separator);
- os.writeBytes(Long.toString(e.getStart()));
+ os.writeBytes(e.isSetHadoopJobId() ? e.getHadoopJobId() : noVal);
os.write(terminator);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
index 94fd289..dc47a38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
@@ -28,7 +28,7 @@ public class ShowCompactionsDesc extends DDLDesc implements Serializable {
private static final long serialVersionUID = 1L;
private static final String schema = "dbname,tabname,partname,type,state,workerid," +
- "starttime#string:string:string:string:string:string:string";
+ "starttime,duration,hadoopjobid#string:string:string:string:string:string:string:string:string";
private String resFile;
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 9ac2964..2f25925 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
@@ -198,7 +199,7 @@ public class CompactorMR {
* @throws java.io.IOException if the job fails
*/
void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
- ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException {
+ ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException {
if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
@@ -232,7 +233,7 @@ public class CompactorMR {
launchCompactionJob(jobMinorCompact,
null, CompactionType.MINOR, null,
parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle),
- maxDeltastoHandle, -1, conf);
+ maxDeltastoHandle, -1, conf, txnHandler, ci.id);
}
//now recompute state since we've done minor compactions and have different 'best' set of deltas
dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
@@ -270,14 +271,15 @@ public class CompactorMR {
}
launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(),
- dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf);
+ dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, txnHandler, ci.id);
su.gatherStats();
}
private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType,
StringableList dirsToSearch,
List<AcidUtils.ParsedDelta> parsedDeltas,
- int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf) throws IOException {
+ int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf,
+ TxnStore txnHandler, long id) throws IOException {
job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR);
if(dirsToSearch == null) {
dirsToSearch = new StringableList();
@@ -308,7 +310,8 @@ public class CompactorMR {
"(current delta dirs count=" + curDirNumber +
", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]");
RunningJob rj = JobClient.runJob(job);
- LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID());
+ LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID() + " compaction ID=" + id);
+ txnHandler.setHadoopJobId(rj.getID().toString(), id);
rj.waitForCompletion();
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 666f13b..2d6cce9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -163,14 +163,14 @@ public class Worker extends CompactorThread {
launchedJob = true;
try {
if (runJobAsSelf(runAs)) {
- mr.run(conf, jobName.toString(), t, sd, txns, ci, su);
+ mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler);
} else {
UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
UserGroupInformation.getLoginUser());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- mr.run(conf, jobName.toString(), t, sd, txns, ci, su);
+ mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler);
return null;
}
});
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 49ba667..9145cf3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -309,6 +309,7 @@ public class TestTxnCommands2 {
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
+ Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
// 3. Perform a delete.
runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1");
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q b/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
index da8e448..24a42ea 100644
--- a/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
+++ b/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
@@ -21,4 +21,10 @@ show locks partitioned_acid_table partition (p='abc');
show locks partitioned_acid_table partition (p='abc') extended;
+insert into partitioned_acid_table partition(p='abc') values(1,2);
+
+alter table partitioned_acid_table partition(p='abc') compact 'minor';
+
+show compactions;
+
drop table partitioned_acid_table;
http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
index 3b683f8..4da2c87 100644
--- a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
+++ b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
@@ -51,6 +51,26 @@ PREHOOK: type: SHOWLOCKS
POSTHOOK: query: show locks partitioned_acid_table partition (p='abc') extended
POSTHOOK: type: SHOWLOCKS
Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info
+PREHOOK: query: insert into partitioned_acid_table partition(p='abc') values(1,2)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@partitioned_acid_table@p=abc
+POSTHOOK: query: insert into partitioned_acid_table partition(p='abc') values(1,2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@partitioned_acid_table@p=abc
+POSTHOOK: Lineage: partitioned_acid_table PARTITION(p=abc).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: partitioned_acid_table PARTITION(p=abc).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: alter table partitioned_acid_table partition(p='abc') compact 'minor'
+PREHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: query: alter table partitioned_acid_table partition(p='abc') compact 'minor'
+POSTHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: query: show compactions
+PREHOOK: type: SHOW COMPACTIONS
+POSTHOOK: query: show compactions
+POSTHOOK: type: SHOW COMPACTIONS
+Database Table Partition Type State Worker Start Time Duration(ms) HadoopJobId
+default partitioned_acid_table p=abc MINOR initiated --- --- --- ---
PREHOOK: query: drop table partitioned_acid_table
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@partitioned_acid_table
[3/3] hive git commit: HIVE-12504 TxnHandler.abortTxn() should check
if already aborted to improve message (Eugene Koifman, reviewed by Wei Zheng)
Posted by ek...@apache.org.
HIVE-12504 TxnHandler.abortTxn() should check if already aborted to improve message (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2a24612b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2a24612b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2a24612b
Branch: refs/heads/master
Commit: 2a24612befbba2849f429653b6bf8de7461d2874
Parents: 252dd7e
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Dec 6 16:28:16 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Dec 6 16:28:16 2016 -0800
----------------------------------------------------------------------
.../hadoop/hive/metastore/txn/TxnHandler.java | 9 ++++--
.../hadoop/hive/metastore/txn/TxnStore.java | 2 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 2 ++
.../hive/metastore/txn/TestTxnHandler.java | 31 ++++++++++++++++++++
.../hive/ql/lockmgr/TestDbTxnManager.java | 2 +-
5 files changed, 41 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2a24612b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 0c495ef..ea46d84 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.DatabaseProduct;
import org.apache.hadoop.hive.metastore.HouseKeeperService;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.dbcp.PoolingDataSource;
@@ -515,17 +516,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
- public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException {
+ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException {
long txnid = rqst.getTxnid();
try {
Connection dbConn = null;
+ Statement stmt = null;
try {
lockInternal();
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) {
LOG.debug("Going to rollback");
dbConn.rollback();
- throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid));
+ stmt = dbConn.createStatement();
+ ensureValidTxn(dbConn, txnid, stmt);
}
LOG.debug("Going to commit");
@@ -537,7 +540,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
- closeDbConn(dbConn);
+ close(null, stmt, dbConn);
unlockInternal();
}
} catch (RetryException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/2a24612b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 170280e..879ae55 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -97,7 +97,7 @@ public interface TxnStore {
* @throws NoSuchTxnException
* @throws MetaException
*/
- public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException;
+ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException;
/**
* Abort (rollback) a list of transactions in one request.
http://git-wip-us.apache.org/repos/asf/hive/blob/2a24612b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 867e445..203eae5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -406,6 +406,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {
} catch (NoSuchTxnException e) {
LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
+ } catch(TxnAbortedException e) {
+ throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId));
} catch (TException e) {
throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
e);
http://git-wip-us.apache.org/repos/asf/hive/blob/2a24612b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index dbe1ce8..11cedb9 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.metastore.txn;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
@@ -162,6 +163,36 @@ public class TestTxnHandler {
saw[tid.intValue()] = true;
}
for (int i = 1; i < saw.length; i++) assertTrue(saw[i]);
+ txnHandler.commitTxn(new CommitTxnRequest(2));
+ boolean gotException = false;
+ try {
+ txnHandler.abortTxn(new AbortTxnRequest(1));
+ }
+ catch(TxnAbortedException ex) {
+ gotException = true;
+ Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(1) + " already aborted", ex.getMessage());
+ }
+ Assert.assertTrue(gotException);
+ gotException = false;
+ try {
+ txnHandler.abortTxn(new AbortTxnRequest(2));
+ }
+ catch(NoSuchTxnException ex) {
+ gotException = true;
+ //if this wasn't an empty txn, we'd get a better msg
+ //Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(2) + " already committed.", ex.getMessage());
+ Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage());
+ }
+ Assert.assertTrue(gotException);
+ gotException = false;
+ try {
+ txnHandler.abortTxn(new AbortTxnRequest(3));
+ }
+ catch(NoSuchTxnException ex) {
+ gotException = true;
+ Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(3), ex.getMessage());
+ }
+ Assert.assertTrue(gotException);
}
@Test
http://git-wip-us.apache.org/repos/asf/hive/blob/2a24612b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 30d7b94..460bad5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -237,7 +237,7 @@ public class TestDbTxnManager {
exception = ex;
}
Assert.assertNotNull("Expected exception2", exception);
- Assert.assertEquals("Wrong Exception2", ErrorMsg.TXN_NO_SUCH_TRANSACTION, exception.getCanonicalErrorMsg());
+ Assert.assertEquals("Wrong Exception2", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
}
@Test