You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2015/11/23 17:58:36 UTC
[1/2] hive git commit: HIVE-12389
CompactionTxnHandler.cleanEmptyAbortedTxns() should safeguard against huge IN
clauses (Eugene Koifman, reviewed by Jason Dere)
Repository: hive
Updated Branches:
refs/heads/master 8e9bae21a -> f90d798e8
HIVE-12389 CompactionTxnHandler.cleanEmptyAbortedTxns() should safeguard against huge IN clauses (Eugene Koifman, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/695d905b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/695d905b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/695d905b
Branch: refs/heads/master
Commit: 695d905bd3fb27ffb04b28e11d5bd7210321b755
Parents: 8e9bae2
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Nov 23 08:18:07 2015 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Nov 23 08:18:07 2015 -0800
----------------------------------------------------------------------
.../metastore/txn/CompactionTxnHandler.java | 47 ++++++++++++--------
.../hadoop/hive/metastore/txn/TxnHandler.java | 11 +++--
.../hive/ql/txn/compactor/TestInitiator.java | 5 ++-
3 files changed, 38 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/695d905b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 5e4c7be..3e0e656 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -361,14 +361,13 @@ public class CompactionTxnHandler extends TxnHandler {
"marking compaction entry as clean!");
}
- //todo: add distinct in query
- s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
+ s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
info.tableName + "'";
if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
LOG.debug("Going to execute update <" + s + ">");
rs = stmt.executeQuery(s);
- Set<Long> txnids = new HashSet<Long>();
+ List<Long> txnids = new ArrayList<>();
while (rs.next()) txnids.add(rs.getLong(1));
if (txnids.size() > 0) {
@@ -437,23 +436,21 @@ public class CompactionTxnHandler extends TxnHandler {
"txn_state = '" + TXN_ABORTED + "'";
LOG.debug("Going to execute query <" + s + ">");
rs = stmt.executeQuery(s);
- Set<Long> txnids = new HashSet<Long>();
+ List<Long> txnids = new ArrayList<>();
while (rs.next()) txnids.add(rs.getLong(1));
- if (txnids.size() > 0) {
- StringBuilder buf = new StringBuilder("delete from TXNS where txn_id in (");
- boolean first = true;
- for (long tid : txnids) {
- if (first) first = false;
- else buf.append(", ");
- buf.append(tid);
- }
- buf.append(")");
- String bufStr = buf.toString();
- LOG.debug("Going to execute update <" + bufStr + ">");
- int rc = stmt.executeUpdate(bufStr);
- LOG.info("Removed " + rc + " empty Aborted transactions: " + txnids + " from TXNS");
- LOG.debug("Going to commit");
- dbConn.commit();
+ close(rs);
+ if(txnids.size() <= 0) {
+ return;
+ }
+ for(int i = 0; i < txnids.size() / TIMED_OUT_TXN_ABORT_BATCH_SIZE; i++) {
+ List<Long> txnIdBatch = txnids.subList(i * TIMED_OUT_TXN_ABORT_BATCH_SIZE,
+ (i + 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE);
+ deleteTxns(dbConn, stmt, txnIdBatch);
+ }
+ int partialBatchSize = txnids.size() % TIMED_OUT_TXN_ABORT_BATCH_SIZE;
+ if(partialBatchSize > 0) {
+ List<Long> txnIdBatch = txnids.subList(txnids.size() - partialBatchSize, txnids.size());
+ deleteTxns(dbConn, stmt, txnIdBatch);
}
} catch (SQLException e) {
LOG.error("Unable to delete from txns table " + e.getMessage());
@@ -469,6 +466,18 @@ public class CompactionTxnHandler extends TxnHandler {
cleanEmptyAbortedTxns();
}
}
+ private static void deleteTxns(Connection dbConn, Statement stmt, List<Long> txnIdBatch) throws SQLException {
+ StringBuilder buf = new StringBuilder("delete from TXNS where txn_id in (");
+ for(long txnid : txnIdBatch) {
+ buf.append(txnid).append(',');
+ }
+ buf.setCharAt(buf.length() - 1, ')');
+ LOG.debug("Going to execute update <" + buf + ">");
+ int rc = stmt.executeUpdate(buf.toString());
+ LOG.info("Removed " + rc + " empty Aborted transactions: " + txnIdBatch + " from TXNS");
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ }
/**
* This will take all entries assigned to workers
http://git-wip-us.apache.org/repos/asf/hive/blob/695d905b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 7f8cb71..ca37bf0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -22,7 +22,8 @@ import com.jolbox.bonecp.BoneCPDataSource;
import org.apache.commons.dbcp.ConnectionFactory;
import org.apache.commons.dbcp.DriverManagerConnectionFactory;
import org.apache.commons.dbcp.PoolableConnectionFactory;
-import org.apache.tools.ant.taskdefs.Java;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.dbcp.PoolingDataSource;
@@ -59,6 +60,8 @@ import java.util.concurrent.TimeUnit;
* Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding
* transaction in TXNS.
*/
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
public class TxnHandler {
// Compactor states
static final public String INITIATED_RESPONSE = "initiated";
@@ -87,7 +90,7 @@ public class TxnHandler {
static final protected char LOCK_SEMI_SHARED = 'w';
static final private int ALLOWED_REPEATED_DEADLOCKS = 10;
- static final private int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 100;
+ public static final int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 1000;
static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName());
static private DataSource connPool;
@@ -1172,7 +1175,7 @@ public class TxnHandler {
*/
protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaException {
if (dbProduct == null) {
- try {
+ try {//todo: make this work when conn == null
String s = conn.getMetaData().getDatabaseProductName();
if (s == null) {
String msg = "getDatabaseProductName returns null, can't determine database product";
@@ -2046,7 +2049,7 @@ public class TxnHandler {
stmt = dbConn.createStatement();
String s = " txn_id from TXNS where txn_state = '" + TXN_OPEN +
"' and txn_last_heartbeat < " + (now - timeout);
- s = addLimitClause(dbConn, 2500, s);
+ s = addLimitClause(dbConn, 250 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s);
LOG.debug("Going to execute query <" + s + ">");
rs = stmt.executeQuery(s);
if(!rs.next()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/695d905b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index e9b4154..03a6494 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -204,12 +205,12 @@ public class TestInitiator extends CompactorTest {
LockResponse res = txnHandler.lock(req);
txnHandler.abortTxn(new AbortTxnRequest(txnid));
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50; i++) {
txnid = openTxn();
txnHandler.abortTxn(new AbortTxnRequest(txnid));
}
GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
- Assert.assertEquals(101, openTxns.getOpen_txnsSize());
+ Assert.assertEquals(TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
startInitiator();
[2/2] hive git commit: HIVE-12409 make sure SessionState.initTxnMgr()
is thread safe (Eugene Koifman, reviewed by Jason Dere)
Posted by ek...@apache.org.
HIVE-12409 make sure SessionState.initTxnMgr() is thread safe (Eugene Koifman, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f90d798e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f90d798e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f90d798e
Branch: refs/heads/master
Commit: f90d798e830d56745c8bc0cfee35741ed66aab90
Parents: 695d905
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Nov 23 08:20:06 2015 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Nov 23 08:20:06 2015 -0800
----------------------------------------------------------------------
ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f90d798e/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index ff875df..5c69fb6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -400,7 +400,7 @@ public class SessionState {
* @return transaction manager
* @throws LockException
*/
- public HiveTxnManager initTxnMgr(HiveConf conf) throws LockException {
+ public synchronized HiveTxnManager initTxnMgr(HiveConf conf) throws LockException {
if (txnMgr == null) {
txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
}