You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/12/07 02:07:30 UTC

[2/3] hive git commit: HIVE-15337 Enhance Show Compactions output with JobId and start time for attempted state (Eugene Koifman, reviewed by Wei Zheng)

HIVE-15337 Enhance Show Compactions output with JobId and start time for attempted state (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/252dd7e7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/252dd7e7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/252dd7e7

Branch: refs/heads/master
Commit: 252dd7e776c115bd96bc2673cfcb653ffad8d27a
Parents: e22e411
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Dec 6 16:24:03 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Dec 6 16:24:03 2016 -0800

----------------------------------------------------------------------
 .../metastore/txn/CompactionTxnHandler.java     | 40 ++++++++++++++++++--
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 12 ++++--
 .../hadoop/hive/metastore/txn/TxnStore.java     |  7 ++++
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 16 ++++++--
 .../hive/ql/plan/ShowCompactionsDesc.java       |  2 +-
 .../hive/ql/txn/compactor/CompactorMR.java      | 13 ++++---
 .../hadoop/hive/ql/txn/compactor/Worker.java    |  4 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |  1 +
 .../queries/clientpositive/dbtxnmgr_showlocks.q |  6 +++
 .../clientpositive/dbtxnmgr_showlocks.q.out     | 20 ++++++++++
 10 files changed, 102 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index fde1b54..545244b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -860,7 +860,8 @@ class CompactionTxnHandler extends TxnHandler {
           //compactions are not happening.
           ci.state = ATTEMPTED_STATE;
           //this is not strictly accurate, but 'type' cannot be null.
-          ci.type = CompactionType.MINOR;
+          if(ci.type == null) { ci.type = CompactionType.MINOR; }
+          ci.start = getDbTime(dbConn);
         }
         else {
           ci.state = FAILED_STATE;
@@ -874,7 +875,7 @@ class CompactionTxnHandler extends TxnHandler {
         closeStmt(pStmt);
         dbConn.commit();
       } catch (SQLException e) {
-        LOG.error("Unable to delete from compaction queue " + e.getMessage());
+        LOG.warn("markFailed(" + ci.id + "):" + e.getMessage());
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         try {
@@ -883,7 +884,7 @@ class CompactionTxnHandler extends TxnHandler {
         catch(MetaException ex) {
           LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
         }
-        LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
+        LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e);
       } finally {
         close(rs, stmt, null);
         close(null, pStmt, dbConn);
@@ -892,7 +893,38 @@ class CompactionTxnHandler extends TxnHandler {
       markFailed(ci);
     }
   }
-
+  @Override
+  public void setHadoopJobId(String hadoopJobId, long id) {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id;
+        LOG.debug("Going to execute <" + s + ">");
+        int updateCount = stmt.executeUpdate(s);
+        LOG.debug("Going to commit");
+        closeStmt(stmt);
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        try {
+          checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")");
+        }
+        catch(MetaException ex) {
+          LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
+        }
+        LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e);
+      } finally {
+        close(null, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      setHadoopJobId(hadoopJobId, id);
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index b0fa836..0c495ef 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1509,6 +1509,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
+          //-1 because 'null' literal doesn't work for all DBs...
           "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " +
           "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " +
           "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS";
@@ -1531,14 +1532,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
               //do nothing to handle RU/D if we add another status
           }
           e.setWorkerid(rs.getString(6));
-          e.setStart(rs.getLong(7));
+          long start = rs.getLong(7);
+          if(!rs.wasNull()) {
+            e.setStart(start);
+          }
           long endTime = rs.getLong(8);
           if(endTime != -1) {
             e.setEndTime(endTime);
           }
           e.setRunAs(rs.getString(9));
           e.setHadoopJobId(rs.getString(10));
-          long id = rs.getLong(11);//for debugging
+          e.setId(rs.getLong(11));
           response.addToCompacts(e);
         }
         LOG.debug("Going to rollback");
@@ -1943,12 +1947,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           }
           sendRetrySignal = true;
         } else {
-          LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
+          LOG.error("Fatal error in " + caller + ". Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
         }
       }
       else {
         //make sure we know we saw an error that we don't recognize
-        LOG.info("Non-retryable error: " + getMessage(e));
+        LOG.info("Non-retryable error in " + caller + " : " + getMessage(e));
       }
     }
     finally {

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 3c06517..170280e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -387,4 +387,11 @@ public interface TxnStore {
       public void releaseLocks();
     }
   }
+
+  /**
+   * Once a {@link java.util.concurrent.ThreadPoolExecutor.Worker} submits a job to the cluster,
+   * it calls this to update the metadata.
+   * @param id {@link CompactionInfo#id}
+   */
+  public void setHadoopJobId(String hadoopJobId, long id);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 4b39eb9..493e1b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -2740,6 +2740,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     ShowCompactResponse rsp = db.showCompactions();
 
     // Write the results into the file
+    final String noVal = " --- ";
+
     DataOutputStream os = getOutputStream(desc.getResFile());
     try {
       // Write a header
@@ -2756,6 +2758,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       os.writeBytes("Worker");
       os.write(separator);
       os.writeBytes("Start Time");
+      os.write(separator);
+      os.writeBytes("Duration(ms)");
+      os.write(separator);
+      os.writeBytes("HadoopJobId");
       os.write(terminator);
 
       if (rsp.getCompacts() != null) {
@@ -2765,16 +2771,20 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
           os.writeBytes(e.getTablename());
           os.write(separator);
           String part = e.getPartitionname();
-          os.writeBytes(part == null ? "NULL" : part);
+          os.writeBytes(part == null ? noVal : part);
           os.write(separator);
           os.writeBytes(e.getType().toString());
           os.write(separator);
           os.writeBytes(e.getState());
           os.write(separator);
           String wid = e.getWorkerid();
-          os.writeBytes(wid == null ? "NULL" : wid);
+          os.writeBytes(wid == null ? noVal : wid);
+          os.write(separator);
+          os.writeBytes(e.isSetStart() ? Long.toString(e.getStart()) : noVal);
+          os.write(separator);
+          os.writeBytes(e.isSetEndTime() ? Long.toString(e.getEndTime() - e.getStart()) : noVal);
           os.write(separator);
-          os.writeBytes(Long.toString(e.getStart()));
+          os.writeBytes(e.isSetHadoopJobId() ?  e.getHadoopJobId() : noVal);
           os.write(terminator);
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
index 94fd289..dc47a38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java
@@ -28,7 +28,7 @@ public class ShowCompactionsDesc extends DDLDesc implements Serializable {
 
   private static final long serialVersionUID = 1L;
   private static final String schema = "dbname,tabname,partname,type,state,workerid," +
-      "starttime#string:string:string:string:string:string:string";
+      "starttime,duration,hadoopjobid#string:string:string:string:string:string:string:string:string";
 
   private String resFile;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 9ac2964..2f25925 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
@@ -198,7 +199,7 @@ public class CompactorMR {
    * @throws java.io.IOException if the job fails
    */
   void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
-           ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException {
+           ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException {
 
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
       throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
@@ -232,7 +233,7 @@ public class CompactorMR {
         launchCompactionJob(jobMinorCompact,
           null, CompactionType.MINOR, null,
           parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle),
-          maxDeltastoHandle, -1, conf);
+          maxDeltastoHandle, -1, conf, txnHandler, ci.id);
       }
       //now recompute state since we've done minor compactions and have different 'best' set of deltas
       dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
@@ -270,14 +271,15 @@ public class CompactorMR {
     }
 
     launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(),
-      dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf);
+      dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, txnHandler, ci.id);
 
     su.gatherStats();
   }
   private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType,
                                    StringableList dirsToSearch,
                                    List<AcidUtils.ParsedDelta> parsedDeltas,
-                                   int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf) throws IOException {
+                                   int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf,
+                                   TxnStore txnHandler, long id) throws IOException {
     job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR);
     if(dirsToSearch == null) {
       dirsToSearch = new StringableList();
@@ -308,7 +310,8 @@ public class CompactorMR {
       "(current delta dirs count=" + curDirNumber +
       ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]");
     RunningJob rj = JobClient.runJob(job);
-    LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID());
+    LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID() + " compaction ID=" + id);
+    txnHandler.setHadoopJobId(rj.getID().toString(), id);
     rj.waitForCompletion();
   }
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 666f13b..2d6cce9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -163,14 +163,14 @@ public class Worker extends CompactorThread {
         launchedJob = true;
         try {
           if (runJobAsSelf(runAs)) {
-            mr.run(conf, jobName.toString(), t, sd, txns, ci, su);
+            mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler);
           } else {
             UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
               UserGroupInformation.getLoginUser());
             ugi.doAs(new PrivilegedExceptionAction<Object>() {
               @Override
               public Object run() throws Exception {
-                mr.run(conf, jobName.toString(), t, sd, txns, ci, su);
+                mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler);
                 return null;
               }
             });

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 49ba667..9145cf3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -309,6 +309,7 @@ public class TestTxnCommands2 {
     ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
     Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
+    Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
 
     // 3. Perform a delete.
     runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1");

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q b/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
index da8e448..24a42ea 100644
--- a/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
+++ b/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q
@@ -21,4 +21,10 @@ show locks partitioned_acid_table partition (p='abc');
 
 show locks partitioned_acid_table partition (p='abc') extended;
 
+insert into partitioned_acid_table partition(p='abc') values(1,2);
+
+alter table partitioned_acid_table partition(p='abc') compact 'minor';
+
+show compactions;
+
 drop table partitioned_acid_table;

http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
index 3b683f8..4da2c87 100644
--- a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
+++ b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
@@ -51,6 +51,26 @@ PREHOOK: type: SHOWLOCKS
 POSTHOOK: query: show locks partitioned_acid_table partition (p='abc') extended
 POSTHOOK: type: SHOWLOCKS
 Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Hearbeat	Acquired At	User	Hostname	Agent Info
+PREHOOK: query: insert into partitioned_acid_table partition(p='abc') values(1,2)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@partitioned_acid_table@p=abc
+POSTHOOK: query: insert into partitioned_acid_table partition(p='abc') values(1,2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@partitioned_acid_table@p=abc
+POSTHOOK: Lineage: partitioned_acid_table PARTITION(p=abc).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: partitioned_acid_table PARTITION(p=abc).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: alter table partitioned_acid_table partition(p='abc') compact 'minor'
+PREHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: query: alter table partitioned_acid_table partition(p='abc') compact 'minor'
+POSTHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: query: show compactions
+PREHOOK: type: SHOW COMPACTIONS
+POSTHOOK: query: show compactions
+POSTHOOK: type: SHOW COMPACTIONS
+Database	Table	Partition	Type	State	Worker	Start Time	Duration(ms)	HadoopJobId
+default	partitioned_acid_table	p=abc	MINOR	initiated	 --- 	 --- 	 --- 	 --- 
 PREHOOK: query: drop table partitioned_acid_table
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@partitioned_acid_table