You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/11/30 01:11:51 UTC

[54/91] [abbrv] hive git commit: HIVE-12389 CompactionTxnHandler.cleanEmptyAbortedTxns() should safeguard against huge IN clauses (Eugene Koifman, reviewed by Jason Dere)

HIVE-12389 CompactionTxnHandler.cleanEmptyAbortedTxns() should safeguard against huge IN clauses (Eugene Koifman, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/695d905b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/695d905b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/695d905b

Branch: refs/heads/spark
Commit: 695d905bd3fb27ffb04b28e11d5bd7210321b755
Parents: 8e9bae2
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Nov 23 08:18:07 2015 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Nov 23 08:18:07 2015 -0800

----------------------------------------------------------------------
 .../metastore/txn/CompactionTxnHandler.java     | 47 ++++++++++++--------
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 11 +++--
 .../hive/ql/txn/compactor/TestInitiator.java    |  5 ++-
 3 files changed, 38 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/695d905b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 5e4c7be..3e0e656 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -361,14 +361,13 @@ public class CompactionTxnHandler extends TxnHandler {
             "marking compaction entry as clean!");
         }
 
-        //todo: add distinct in query
-        s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
+        s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
           TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
           info.tableName + "'";
         if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
         LOG.debug("Going to execute update <" + s + ">");
         rs = stmt.executeQuery(s);
-        Set<Long> txnids = new HashSet<Long>();
+        List<Long> txnids = new ArrayList<>();
         while (rs.next()) txnids.add(rs.getLong(1));
         if (txnids.size() > 0) {
 
@@ -437,23 +436,21 @@ public class CompactionTxnHandler extends TxnHandler {
           "txn_state = '" + TXN_ABORTED + "'";
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
-        Set<Long> txnids = new HashSet<Long>();
+        List<Long> txnids = new ArrayList<>();
         while (rs.next()) txnids.add(rs.getLong(1));
-        if (txnids.size() > 0) {
-          StringBuilder buf = new StringBuilder("delete from TXNS where txn_id in (");
-          boolean first = true;
-          for (long tid : txnids) {
-            if (first) first = false;
-            else buf.append(", ");
-            buf.append(tid);
-          }
-          buf.append(")");
-          String bufStr = buf.toString();
-          LOG.debug("Going to execute update <" + bufStr + ">");
-          int rc = stmt.executeUpdate(bufStr);
-          LOG.info("Removed " + rc + "  empty Aborted transactions: " + txnids + " from TXNS");
-          LOG.debug("Going to commit");
-          dbConn.commit();
+        close(rs);
+        if(txnids.size() <= 0) {
+          return;
+        }
+        for(int i = 0; i < txnids.size() / TIMED_OUT_TXN_ABORT_BATCH_SIZE; i++) {
+          List<Long> txnIdBatch = txnids.subList(i * TIMED_OUT_TXN_ABORT_BATCH_SIZE,
+            (i + 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE);
+          deleteTxns(dbConn, stmt, txnIdBatch);
+        }
+        int partialBatchSize = txnids.size() % TIMED_OUT_TXN_ABORT_BATCH_SIZE;
+        if(partialBatchSize > 0) {
+          List<Long> txnIdBatch = txnids.subList(txnids.size() - partialBatchSize, txnids.size());
+          deleteTxns(dbConn, stmt, txnIdBatch);
         }
       } catch (SQLException e) {
         LOG.error("Unable to delete from txns table " + e.getMessage());
@@ -469,6 +466,18 @@ public class CompactionTxnHandler extends TxnHandler {
       cleanEmptyAbortedTxns();
     }
   }
+  private static void deleteTxns(Connection dbConn, Statement stmt, List<Long> txnIdBatch) throws SQLException {
+    StringBuilder buf = new StringBuilder("delete from TXNS where txn_id in (");
+    for(long txnid : txnIdBatch) {
+      buf.append(txnid).append(',');
+    }
+    buf.setCharAt(buf.length() - 1, ')');
+    LOG.debug("Going to execute update <" + buf + ">");
+    int rc = stmt.executeUpdate(buf.toString());
+    LOG.info("Removed " + rc + "  empty Aborted transactions: " + txnIdBatch + " from TXNS");
+    LOG.debug("Going to commit");
+    dbConn.commit();
+  }
 
   /**
    * This will take all entries assigned to workers

http://git-wip-us.apache.org/repos/asf/hive/blob/695d905b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 7f8cb71..ca37bf0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -22,7 +22,8 @@ import com.jolbox.bonecp.BoneCPDataSource;
 import org.apache.commons.dbcp.ConnectionFactory;
 import org.apache.commons.dbcp.DriverManagerConnectionFactory;
 import org.apache.commons.dbcp.PoolableConnectionFactory;
-import org.apache.tools.ant.taskdefs.Java;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.dbcp.PoolingDataSource;
@@ -59,6 +60,8 @@ import java.util.concurrent.TimeUnit;
  * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding
  * transaction in TXNS.
  */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public class TxnHandler {
   // Compactor states
   static final public String INITIATED_RESPONSE = "initiated";
@@ -87,7 +90,7 @@ public class TxnHandler {
   static final protected char LOCK_SEMI_SHARED = 'w';
 
   static final private int ALLOWED_REPEATED_DEADLOCKS = 10;
-  static final private int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 100;
+  public static final int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 1000;
   static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName());
 
   static private DataSource connPool;
@@ -1172,7 +1175,7 @@ public class TxnHandler {
    */
   protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaException {
     if (dbProduct == null) {
-      try {
+      try {//todo: make this work when conn == null
         String s = conn.getMetaData().getDatabaseProductName();
         if (s == null) {
           String msg = "getDatabaseProductName returns null, can't determine database product";
@@ -2046,7 +2049,7 @@ public class TxnHandler {
         stmt = dbConn.createStatement();
         String s = " txn_id from TXNS where txn_state = '" + TXN_OPEN +
           "' and txn_last_heartbeat <  " + (now - timeout);
-        s = addLimitClause(dbConn, 2500, s);
+        s = addLimitClause(dbConn, 250 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s);
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
         if(!rs.next()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/695d905b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index e9b4154..03a6494 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -204,12 +205,12 @@ public class TestInitiator extends CompactorTest {
     LockResponse res = txnHandler.lock(req);
     txnHandler.abortTxn(new AbortTxnRequest(txnid));
 
-    for (int i = 0; i < 100; i++) {
+    for (int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE  + 50; i++) {
       txnid = openTxn();
       txnHandler.abortTxn(new AbortTxnRequest(txnid));
     }
     GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
-    Assert.assertEquals(101, openTxns.getOpen_txnsSize());
+    Assert.assertEquals(TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
 
     startInitiator();