You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/12/07 02:07:29 UTC

[1/3] hive git commit: HIVE-14895 CompactorMR.CompactorOutputCommitter race condition (Eugene Koifman, reviewed by Wei Zheng)

Repository: hive
Updated Branches:
  refs/heads/master c42bb86b8 -> 2a24612be


HIVE-14895 CompactorMR.CompactorOutputCommitter race condition (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e22e4112
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e22e4112
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e22e4112

Branch: refs/heads/master
Commit: e22e411254dd21ab7df5ae619aebe50d68a07625
Parents: c42bb86
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Dec 6 16:21:54 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Dec 6 16:21:54 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java   | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e22e4112/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index c3e3982..9ac2964 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -837,13 +837,15 @@ public class CompactorMR {
     @Override
     public void commitJob(JobContext context) throws IOException {
       JobConf conf = ShimLoader.getHadoopShims().getJobConf(context);
-      Path tmpLocation = new Path(conf.get(TMP_LOCATION));
+      Path tmpLocation = new Path(conf.get(TMP_LOCATION));//this contains base_xxx or delta_xxx_yyy
       Path finalLocation = new Path(conf.get(FINAL_LOCATION));
       FileSystem fs = tmpLocation.getFileSystem(conf);
       LOG.debug("Moving contents of " + tmpLocation.toString() + " to " +
           finalLocation.toString());
 
-      FileStatus[] contents = fs.listStatus(tmpLocation);
+      FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list
+      //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir
+      //name is that we want to rename; leave it for another day
       for (int i = 0; i < contents.length; i++) {
         Path newPath = new Path(finalLocation, contents[i].getPath().getName());
         fs.rename(contents[i].getPath(), newPath);


[2/3] hive git commit: HIVE-15337 Enhance Show Compactions output with JobId and start time for attempted state (Eugene Koifman, reviewed by Wei Zheng)

Posted by ek...@apache.org.
HIVE-15337 Enhance Show Compactions output with JobId and start time for attempted state (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/252dd7e7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/252dd7e7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/252dd7e7

Branch: refs/heads/master
Commit: 252dd7e776c115bd96bc2673cfcb653ffad8d27a
Parents: e22e411
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Dec 6 16:24:03 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Dec 6 16:24:03 2016 -0800

----------------------------------------------------------------------
 .../metastore/txn/CompactionTxnHandler.java     | 40 ++++++++++++++++++--
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 12 ++++--
 .../hadoop/hive/metastore/txn/TxnStore.java     |  7 ++++
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 16 ++++++--
 .../hive/ql/plan/ShowCompactionsDesc.java       |  2 +-
 .../hive/ql/txn/compactor/CompactorMR.java      | 13 ++++---
 .../hadoop/hive/ql/txn/compactor/Worker.java    |  4 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |  1 +
 .../queries/clientpositive/dbtxnmgr_showlocks.q |  6 +++
 .../clientpositive/dbtxnmgr_showlocks.q.out     | 20 ++++++++++
 10 files changed, 102 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index fde1b54..545244b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -860,7 +860,8 @@ class CompactionTxnHandler extends TxnHandler {
           //compactions are not happening.
           ci.state = ATTEMPTED_STATE;
           //this is not strictly accurate, but 'type' cannot be null.
-          ci.type = CompactionType.MINOR;
+          if(ci.type == null) { ci.type = CompactionType.MINOR; }
+          ci.start = getDbTime(dbConn);
         }
         else {
           ci.state = FAILED_STATE;
@@ -874,7 +875,7 @@ class CompactionTxnHandler extends TxnHandler {
         closeStmt(pStmt);
         dbConn.commit();
       } catch (SQLException e) {
-        LOG.error("Unable to delete from compaction queue " + e.getMessage());
+        LOG.warn("markFailed(" + ci.id + "):" + e.getMessage());
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         try {
@@ -883,7 +884,7 @@ class CompactionTxnHandler extends TxnHandler {
         catch(MetaException ex) {
           LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
         }
-        LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
+        LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e);
       } finally {
         close(rs, stmt, null);
         close(null, pStmt, dbConn);
@@ -892,7 +893,38 @@ class CompactionTxnHandler extends TxnHandler {
       markFailed(ci);
     }
   }
-
+  @Override
+  public void setHadoopJobId(String hadoopJobId, long id) {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id;
+        LOG.debug("Going to execute <" + s + ">");
+        int updateCount = stmt.executeUpdate(s);
+        LOG.debug("Going to commit");
+        closeStmt(stmt);
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        try {
+          checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")");
+        }
+        catch(MetaException ex) {
+          LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
+        }
+        LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e);
+      } finally {
+        close(null, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      setHadoopJobId(hadoopJobId, id);
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index b0fa836..0c495ef 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1509,6 +1509,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
+          //-1 because 'null' literal doesn't work for all DBs...
           "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " +
           "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " +
           "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS";
@@ -1531,14 +1532,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
               //do nothing to handle RU/D if we add another status
           }
           e.setWorkerid(rs.getString(6));
-          e.setStart(rs.getLong(7));
+          long start = rs.getLong(7);
+          if(!rs.wasNull()) {
+            e.setStart(start);
+          }
           long endTime = rs.getLong(8);
           if(endTime != -1) {
             e.setEndTime(endTime);
           }
           e.setRunAs(rs.getString(9));
           e.setHadoopJobId(rs.getString(10));
-          long id = rs.getLong(11);//for debugging
+          e.setId(rs.getLong(11));
           response.addToCompacts(e);
         }
         LOG.debug("Going to rollback");
@@ -1943,12 +1947,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           }
           sendRetrySignal = true;
         } else {
-          LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
+          LOG.error("Fatal error in " + caller + ". Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
         }
       }
       else {
         //make sure we know we saw an error that we don't recognize
-        LOG.info("Non-retryable error: " + getMessage(e));
+        LOG.info("Non-retryable error in " + caller + " : " + getMessage(e));
       }
     }
     finally {

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 3c06517..170280e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -387,4 +387,11 @@ public interface TxnStore {
       public void releaseLocks();
     }
   }
+
+  /**
+   * Once a {@link java.util.concurrent.ThreadPoolExecutor.Worker} submits a job to the cluster,
+   * it calls this to update the metadata.
+   * @param id {@link CompactionInfo#id}
+   */
+  public void setHadoopJobId(String hadoopJobId, long id);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 4b39eb9..493e1b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -2740,6 +2740,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     ShowCompactResponse rsp = db.showCompactions();
 
     // Write the results into the file
+    final String noVal = " --- ";
+
     DataOutputStream os = getOutputStream(desc.getResFile());
     try {
       // Write a header
@@ -2756,6 +2758,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       os.writeBytes("Worker");
       os.write(separator);
       os.writeBytes("Start Time");
+      os.write(separator);
+      os.writeBytes("Duration(ms)");
+      os.write(separator);
+      os.writeBytes("HadoopJobId");
       os.write(terminator);
 
       if (rsp.getCompacts() != null) {
@@ -2765,16 +2771,20 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
           os.writeBytes(e.getTablename());
           os.write(separator);
           String part = e.getPartitionname();
-          os.writeBytes(part == null ? "NULL" : part);
+          os.writeBytes(part == null ? noVal : part);
           os.write(separator);
           os.writeBytes(e.getType().toString());
           os.write(separator);
           os.writeBytes(e.getState());
           os.write(separator);
           String wid = e.getWorkerid();
-          os.writeBytes(wid == null ? "NULL" : wid);
+          os.writeBytes(wid == null ? noVal : wid);
+          os.write(separator);
+          os.writeBytes(e.isSetStart() ? Long.toString(e.getStart()) : noVal);
+          os.write(separator);
+          os.writeBytes(e.isSetEndTime() ? Long.toString(e.getEndTime() - e.getStart()) : noVal);
           os.write(separator);
-          os.writeBytes(Long.toString(e.getStart()));
+          os.writeBytes(e.isSetHadoopJobId() ?  e.getHadoopJobId() : noVal);
           os.write(terminator);
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
index 94fd289..dc47a38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
@@ -28,7 +28,7 @@ public class ShowCompactionsDesc extends DDLDesc implements Serializable {
 
   private static final long serialVersionUID = 1L;
   private static final String schema = "dbname,tabname,partname,type,state,workerid," +
-      "starttime#string:string:string:string:string:string:string";
+      "starttime,duration,hadoopjobid#string:string:string:string:string:string:string:string:string";
 
   private String resFile;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 9ac2964..2f25925 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
@@ -198,7 +199,7 @@ public class CompactorMR {
    * @throws java.io.IOException if the job fails
    */
   void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
-           ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException {
+           ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException {
 
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
       throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
@@ -232,7 +233,7 @@ public class CompactorMR {
         launchCompactionJob(jobMinorCompact,
           null, CompactionType.MINOR, null,
           parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle),
-          maxDeltastoHandle, -1, conf);
+          maxDeltastoHandle, -1, conf, txnHandler, ci.id);
       }
       //now recompute state since we've done minor compactions and have different 'best' set of deltas
       dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
@@ -270,14 +271,15 @@ public class CompactorMR {
     }
 
     launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(),
-      dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf);
+      dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, txnHandler, ci.id);
 
     su.gatherStats();
   }
   private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType,
                                    StringableList dirsToSearch,
                                    List<AcidUtils.ParsedDelta> parsedDeltas,
-                                   int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf) throws IOException {
+                                   int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf,
+                                   TxnStore txnHandler, long id) throws IOException {
     job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR);
     if(dirsToSearch == null) {
       dirsToSearch = new StringableList();
@@ -308,7 +310,8 @@ public class CompactorMR {
       "(current delta dirs count=" + curDirNumber +
       ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]");
     RunningJob rj = JobClient.runJob(job);
-    LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID());
+    LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID() + " compaction ID=" + id);
+    txnHandler.setHadoopJobId(rj.getID().toString(), id);
     rj.waitForCompletion();
   }
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 666f13b..2d6cce9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -163,14 +163,14 @@ public class Worker extends CompactorThread {
         launchedJob = true;
         try {
           if (runJobAsSelf(runAs)) {
-            mr.run(conf, jobName.toString(), t, sd, txns, ci, su);
+            mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler);
           } else {
             UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
               UserGroupInformation.getLoginUser());
             ugi.doAs(new PrivilegedExceptionAction<Object>() {
               @Override
               public Object run() throws Exception {
-                mr.run(conf, jobName.toString(), t, sd, txns, ci, su);
+                mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler);
                 return null;
               }
             });

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 49ba667..9145cf3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -309,6 +309,7 @@ public class TestTxnCommands2 {
     ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
     Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
+    Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
 
     // 3. Perform a delete.
     runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1");

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q b/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
index da8e448..24a42ea 100644
--- a/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
+++ b/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
@@ -21,4 +21,10 @@ show locks partitioned_acid_table partition (p='abc');
 
 show locks partitioned_acid_table partition (p='abc') extended;
 
+insert into partitioned_acid_table partition(p='abc') values(1,2);
+
+alter table partitioned_acid_table partition(p='abc') compact 'minor';
+
+show compactions;
+
 drop table partitioned_acid_table;

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
index 3b683f8..4da2c87 100644
--- a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
+++ b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
@@ -51,6 +51,26 @@ PREHOOK: type: SHOWLOCKS
 POSTHOOK: query: show locks partitioned_acid_table partition (p='abc') extended
 POSTHOOK: type: SHOWLOCKS
 Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Hearbeat	Acquired At	User	Hostname	Agent Info
+PREHOOK: query: insert into partitioned_acid_table partition(p='abc') values(1,2)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@partitioned_acid_table@p=abc
+POSTHOOK: query: insert into partitioned_acid_table partition(p='abc') values(1,2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@partitioned_acid_table@p=abc
+POSTHOOK: Lineage: partitioned_acid_table PARTITION(p=abc).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: partitioned_acid_table PARTITION(p=abc).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: alter table partitioned_acid_table partition(p='abc') compact 'minor'
+PREHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: query: alter table partitioned_acid_table partition(p='abc') compact 'minor'
+POSTHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: query: show compactions
+PREHOOK: type: SHOW COMPACTIONS
+POSTHOOK: query: show compactions
+POSTHOOK: type: SHOW COMPACTIONS
+Database	Table	Partition	Type	State	Worker	Start Time	Duration(ms)	HadoopJobId
+default	partitioned_acid_table	p=abc	MINOR	initiated	 --- 	 --- 	 --- 	 --- 
 PREHOOK: query: drop table partitioned_acid_table
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@partitioned_acid_table


[3/3] hive git commit: HIVE-12504 TxnHandler.abortTxn() should check if already aborted to improve message (Eugene Koifman, reviewed by Wei Zheng)

Posted by ek...@apache.org.
HIVE-12504 TxnHandler.abortTxn() should check if already aborted to improve message (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2a24612b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2a24612b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2a24612b

Branch: refs/heads/master
Commit: 2a24612befbba2849f429653b6bf8de7461d2874
Parents: 252dd7e
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Dec 6 16:28:16 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Dec 6 16:28:16 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  9 ++++--
 .../hadoop/hive/metastore/txn/TxnStore.java     |  2 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |  2 ++
 .../hive/metastore/txn/TestTxnHandler.java      | 31 ++++++++++++++++++++
 .../hive/ql/lockmgr/TestDbTxnManager.java       |  2 +-
 5 files changed, 41 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2a24612b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 0c495ef..ea46d84 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hadoop.hive.metastore.DatabaseProduct;
 import org.apache.hadoop.hive.metastore.HouseKeeperService;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.dbcp.PoolingDataSource;
@@ -515,17 +516,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
-  public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException {
+  public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException {
     long txnid = rqst.getTxnid();
     try {
       Connection dbConn = null;
+      Statement stmt = null;
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) {
           LOG.debug("Going to rollback");
           dbConn.rollback();
-          throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid));
+          stmt = dbConn.createStatement();
+          ensureValidTxn(dbConn, txnid, stmt);
         }
 
         LOG.debug("Going to commit");
@@ -537,7 +540,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
           + StringUtils.stringifyException(e));
       } finally {
-        closeDbConn(dbConn);
+        close(null, stmt, dbConn);
         unlockInternal();
       }
     } catch (RetryException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2a24612b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 170280e..879ae55 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -97,7 +97,7 @@ public interface TxnStore {
    * @throws NoSuchTxnException
    * @throws MetaException
    */
-  public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException;
+  public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException;
 
   /**
    * Abort (rollback) a list of transactions in one request.

http://git-wip-us.apache.org/repos/asf/hive/blob/2a24612b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 867e445..203eae5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -406,6 +406,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     } catch (NoSuchTxnException e) {
       LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
       throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
+    } catch(TxnAbortedException e) {
+      throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId));
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
           e);

http://git-wip-us.apache.org/repos/asf/hive/blob/2a24612b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index dbe1ce8..11cedb9 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.metastore.txn;
 
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
@@ -162,6 +163,36 @@ public class TestTxnHandler {
       saw[tid.intValue()] = true;
     }
     for (int i = 1; i < saw.length; i++) assertTrue(saw[i]);
+    txnHandler.commitTxn(new CommitTxnRequest(2));
+    boolean gotException = false;
+    try {
+      txnHandler.abortTxn(new AbortTxnRequest(1));
+    }
+    catch(TxnAbortedException ex) {
+      gotException = true;
+      Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(1) + " already aborted", ex.getMessage());
+    }
+    Assert.assertTrue(gotException);
+    gotException = false;
+    try {
+      txnHandler.abortTxn(new AbortTxnRequest(2));
+    }
+    catch(NoSuchTxnException ex) {
+      gotException = true;
+      //if this wasn't an empty txn, we'd get a better msg
+      //Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(2) + " already committed.", ex.getMessage());
+      Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage());
+    }
+    Assert.assertTrue(gotException);
+    gotException = false;
+    try {
+      txnHandler.abortTxn(new AbortTxnRequest(3));
+    }
+    catch(NoSuchTxnException ex) {
+      gotException = true;
+      Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(3), ex.getMessage());
+    }
+    Assert.assertTrue(gotException);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/2a24612b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 30d7b94..460bad5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -237,7 +237,7 @@ public class TestDbTxnManager {
       exception = ex;
     }
     Assert.assertNotNull("Expected exception2", exception);
-    Assert.assertEquals("Wrong Exception2", ErrorMsg.TXN_NO_SUCH_TRANSACTION, exception.getCanonicalErrorMsg());
+    Assert.assertEquals("Wrong Exception2", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
   }
 
   @Test