You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/12/02 00:52:36 UTC
svn commit: r1642789 - in /hive/trunk:
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
Author: gates
Date: Mon Dec 1 23:52:35 2014
New Revision: 1642789
URL: http://svn.apache.org/r1642789
Log:
HIVE-8948 TestStreaming is flaky (Alan Gates, reviewed by Eugene Koifman)
Modified:
hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
Modified: hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java?rev=1642789&r1=1642788&r2=1642789&view=diff
==============================================================================
--- hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java (original)
+++ hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java Mon Dec 1 23:52:35 2014
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapred.InputSpl
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.thrift.TException;
+import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -169,6 +170,11 @@ public class TestStreaming {
createDbAndTable(msClient, dbName2, tblName2, partitionVals);
}
+ @After
+ public void cleanup() throws Exception {
+ msClient.close();
+ }
+
private static List<FieldSchema> getPartitionKeys() {
List<FieldSchema> fields = new ArrayList<FieldSchema>();
// Defining partition names in unsorted order
@@ -664,7 +670,7 @@ public class TestStreaming {
public void run() {
TransactionBatch txnBatch = null;
try {
- txnBatch = conn.fetchTransactionBatch(1000, writer);
+ txnBatch = conn.fetchTransactionBatch(10, writer);
while (txnBatch.remainingTransactions() > 0) {
txnBatch.beginNextTransaction();
txnBatch.write(data.getBytes());
Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java?rev=1642789&r1=1642788&r2=1642789&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java Mon Dec 1 23:52:35 2014
@@ -22,9 +22,12 @@ import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.SQLTransactionRollbackException;
import java.sql.Statement;
import java.util.Properties;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -34,8 +37,11 @@ import org.apache.hadoop.hive.shims.Shim
*/
public final class TxnDbUtil {
+ static final private Log LOG = LogFactory.getLog(TxnDbUtil.class.getName());
private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+ private static int deadlockCnt = 0;
+
private TxnDbUtil() {
throw new UnsupportedOperationException("Can't initialize class");
}
@@ -64,60 +70,70 @@ public final class TxnDbUtil {
conn = getConnection();
stmt = conn.createStatement();
stmt.execute("CREATE TABLE TXNS (" +
- " TXN_ID bigint PRIMARY KEY," +
- " TXN_STATE char(1) NOT NULL," +
- " TXN_STARTED bigint NOT NULL," +
- " TXN_LAST_HEARTBEAT bigint NOT NULL," +
- " TXN_USER varchar(128) NOT NULL," +
- " TXN_HOST varchar(128) NOT NULL)");
+ " TXN_ID bigint PRIMARY KEY," +
+ " TXN_STATE char(1) NOT NULL," +
+ " TXN_STARTED bigint NOT NULL," +
+ " TXN_LAST_HEARTBEAT bigint NOT NULL," +
+ " TXN_USER varchar(128) NOT NULL," +
+ " TXN_HOST varchar(128) NOT NULL)");
stmt.execute("CREATE TABLE TXN_COMPONENTS (" +
- " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
- " TC_DATABASE varchar(128) NOT NULL," +
- " TC_TABLE varchar(128)," +
- " TC_PARTITION varchar(767))");
+ " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
+ " TC_DATABASE varchar(128) NOT NULL," +
+ " TC_TABLE varchar(128)," +
+ " TC_PARTITION varchar(767))");
stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
- " CTC_TXNID bigint," +
- " CTC_DATABASE varchar(128) NOT NULL," +
- " CTC_TABLE varchar(128)," +
- " CTC_PARTITION varchar(767))");
+ " CTC_TXNID bigint," +
+ " CTC_DATABASE varchar(128) NOT NULL," +
+ " CTC_TABLE varchar(128)," +
+ " CTC_PARTITION varchar(767))");
stmt.execute("CREATE TABLE NEXT_TXN_ID (" + " NTXN_NEXT bigint NOT NULL)");
stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
stmt.execute("CREATE TABLE HIVE_LOCKS (" +
- " HL_LOCK_EXT_ID bigint NOT NULL," +
- " HL_LOCK_INT_ID bigint NOT NULL," +
- " HL_TXNID bigint," +
- " HL_DB varchar(128) NOT NULL," +
- " HL_TABLE varchar(128)," +
- " HL_PARTITION varchar(767)," +
- " HL_LOCK_STATE char(1) NOT NULL," +
- " HL_LOCK_TYPE char(1) NOT NULL," +
- " HL_LAST_HEARTBEAT bigint NOT NULL," +
- " HL_ACQUIRED_AT bigint," +
- " HL_USER varchar(128) NOT NULL," +
- " HL_HOST varchar(128) NOT NULL," +
- " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
+ " HL_LOCK_EXT_ID bigint NOT NULL," +
+ " HL_LOCK_INT_ID bigint NOT NULL," +
+ " HL_TXNID bigint," +
+ " HL_DB varchar(128) NOT NULL," +
+ " HL_TABLE varchar(128)," +
+ " HL_PARTITION varchar(767)," +
+ " HL_LOCK_STATE char(1) NOT NULL," +
+ " HL_LOCK_TYPE char(1) NOT NULL," +
+ " HL_LAST_HEARTBEAT bigint NOT NULL," +
+ " HL_ACQUIRED_AT bigint," +
+ " HL_USER varchar(128) NOT NULL," +
+ " HL_HOST varchar(128) NOT NULL," +
+ " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)");
stmt.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)");
stmt.execute("CREATE TABLE COMPACTION_QUEUE (" +
- " CQ_ID bigint PRIMARY KEY," +
- " CQ_DATABASE varchar(128) NOT NULL," +
- " CQ_TABLE varchar(128) NOT NULL," +
- " CQ_PARTITION varchar(767)," +
- " CQ_STATE char(1) NOT NULL," +
- " CQ_TYPE char(1) NOT NULL," +
- " CQ_WORKER_ID varchar(128)," +
- " CQ_START bigint," +
- " CQ_RUN_AS varchar(128))");
+ " CQ_ID bigint PRIMARY KEY," +
+ " CQ_DATABASE varchar(128) NOT NULL," +
+ " CQ_TABLE varchar(128) NOT NULL," +
+ " CQ_PARTITION varchar(767)," +
+ " CQ_STATE char(1) NOT NULL," +
+ " CQ_TYPE char(1) NOT NULL," +
+ " CQ_WORKER_ID varchar(128)," +
+ " CQ_START bigint," +
+ " CQ_RUN_AS varchar(128))");
stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
conn.commit();
+ } catch (SQLException e) {
+ // This might be a deadlock, if so, let's retry
+ conn.rollback();
+ if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) {
+ LOG.warn("Caught deadlock, retrying db creation");
+ prepDb();
+ } else {
+ throw e;
+ }
} finally {
+ deadlockCnt = 0;
closeResources(conn, stmt, null);
}
}