You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/12/02 00:52:36 UTC

svn commit: r1642789 - in /hive/trunk: hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java

Author: gates
Date: Mon Dec  1 23:52:35 2014
New Revision: 1642789

URL: http://svn.apache.org/r1642789
Log:
HIVE-8948 TestStreaming is flaky (Alan Gates, reviewed by Eugene Koifman)

Modified:
    hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java

Modified: hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java?rev=1642789&r1=1642788&r2=1642789&view=diff
==============================================================================
--- hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java (original)
+++ hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java Mon Dec  1 23:52:35 2014
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapred.InputSpl
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.thrift.TException;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -169,6 +170,11 @@ public class TestStreaming {
     createDbAndTable(msClient, dbName2, tblName2, partitionVals);
   }
 
+  @After
+  public void cleanup() throws Exception {
+    msClient.close();
+  }
+
   private static List<FieldSchema> getPartitionKeys() {
     List<FieldSchema> fields = new ArrayList<FieldSchema>();
     // Defining partition names in unsorted order
@@ -664,7 +670,7 @@ public class TestStreaming {
     public void run() {
       TransactionBatch txnBatch = null;
       try {
-        txnBatch =  conn.fetchTransactionBatch(1000, writer);
+        txnBatch =  conn.fetchTransactionBatch(10, writer);
         while (txnBatch.remainingTransactions() > 0) {
           txnBatch.beginNextTransaction();
           txnBatch.write(data.getBytes());

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java?rev=1642789&r1=1642788&r2=1642789&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java Mon Dec  1 23:52:35 2014
@@ -22,9 +22,12 @@ import java.sql.Driver;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.SQLTransactionRollbackException;
 import java.sql.Statement;
 import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.shims.ShimLoader;
 
@@ -34,8 +37,11 @@ import org.apache.hadoop.hive.shims.Shim
  */
 public final class TxnDbUtil {
 
+  static final private Log LOG = LogFactory.getLog(TxnDbUtil.class.getName());
   private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
 
+  private static int deadlockCnt = 0;
+
   private TxnDbUtil() {
     throw new UnsupportedOperationException("Can't initialize class");
   }
@@ -64,60 +70,70 @@ public final class TxnDbUtil {
       conn = getConnection();
       stmt = conn.createStatement();
       stmt.execute("CREATE TABLE TXNS (" +
-                   "  TXN_ID bigint PRIMARY KEY," +
-                   "  TXN_STATE char(1) NOT NULL," +
-                   "  TXN_STARTED bigint NOT NULL," +
-                   "  TXN_LAST_HEARTBEAT bigint NOT NULL," +
-                   "  TXN_USER varchar(128) NOT NULL," +
-                   "  TXN_HOST varchar(128) NOT NULL)");
+          "  TXN_ID bigint PRIMARY KEY," +
+          "  TXN_STATE char(1) NOT NULL," +
+          "  TXN_STARTED bigint NOT NULL," +
+          "  TXN_LAST_HEARTBEAT bigint NOT NULL," +
+          "  TXN_USER varchar(128) NOT NULL," +
+          "  TXN_HOST varchar(128) NOT NULL)");
 
       stmt.execute("CREATE TABLE TXN_COMPONENTS (" +
-                   "  TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
-                   "  TC_DATABASE varchar(128) NOT NULL," +
-                   "  TC_TABLE varchar(128)," +
-                   "  TC_PARTITION varchar(767))");
+          "  TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
+          "  TC_DATABASE varchar(128) NOT NULL," +
+          "  TC_TABLE varchar(128)," +
+          "  TC_PARTITION varchar(767))");
       stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
-                   "  CTC_TXNID bigint," +
-                   "  CTC_DATABASE varchar(128) NOT NULL," +
-                   "  CTC_TABLE varchar(128)," +
-                   "  CTC_PARTITION varchar(767))");
+          "  CTC_TXNID bigint," +
+          "  CTC_DATABASE varchar(128) NOT NULL," +
+          "  CTC_TABLE varchar(128)," +
+          "  CTC_PARTITION varchar(767))");
       stmt.execute("CREATE TABLE NEXT_TXN_ID (" + "  NTXN_NEXT bigint NOT NULL)");
       stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
       stmt.execute("CREATE TABLE HIVE_LOCKS (" +
-                   " HL_LOCK_EXT_ID bigint NOT NULL," +
-                   " HL_LOCK_INT_ID bigint NOT NULL," +
-                   " HL_TXNID bigint," +
-                   " HL_DB varchar(128) NOT NULL," +
-                   " HL_TABLE varchar(128)," +
-                   " HL_PARTITION varchar(767)," +
-                   " HL_LOCK_STATE char(1) NOT NULL," +
-                   " HL_LOCK_TYPE char(1) NOT NULL," +
-                   " HL_LAST_HEARTBEAT bigint NOT NULL," +
-                   " HL_ACQUIRED_AT bigint," +
-                   " HL_USER varchar(128) NOT NULL," +
-                   " HL_HOST varchar(128) NOT NULL," +
-                   " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
+          " HL_LOCK_EXT_ID bigint NOT NULL," +
+          " HL_LOCK_INT_ID bigint NOT NULL," +
+          " HL_TXNID bigint," +
+          " HL_DB varchar(128) NOT NULL," +
+          " HL_TABLE varchar(128)," +
+          " HL_PARTITION varchar(767)," +
+          " HL_LOCK_STATE char(1) NOT NULL," +
+          " HL_LOCK_TYPE char(1) NOT NULL," +
+          " HL_LAST_HEARTBEAT bigint NOT NULL," +
+          " HL_ACQUIRED_AT bigint," +
+          " HL_USER varchar(128) NOT NULL," +
+          " HL_HOST varchar(128) NOT NULL," +
+          " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
       stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
 
       stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)");
       stmt.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)");
 
       stmt.execute("CREATE TABLE COMPACTION_QUEUE (" +
-                   " CQ_ID bigint PRIMARY KEY," +
-                   " CQ_DATABASE varchar(128) NOT NULL," +
-                   " CQ_TABLE varchar(128) NOT NULL," +
-                   " CQ_PARTITION varchar(767)," +
-                   " CQ_STATE char(1) NOT NULL," +
-                   " CQ_TYPE char(1) NOT NULL," +
-                   " CQ_WORKER_ID varchar(128)," +
-                   " CQ_START bigint," +
-                   " CQ_RUN_AS varchar(128))");
+          " CQ_ID bigint PRIMARY KEY," +
+          " CQ_DATABASE varchar(128) NOT NULL," +
+          " CQ_TABLE varchar(128) NOT NULL," +
+          " CQ_PARTITION varchar(767)," +
+          " CQ_STATE char(1) NOT NULL," +
+          " CQ_TYPE char(1) NOT NULL," +
+          " CQ_WORKER_ID varchar(128)," +
+          " CQ_START bigint," +
+          " CQ_RUN_AS varchar(128))");
 
       stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
       stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
 
       conn.commit();
+    } catch (SQLException e) {
+      // This might be a deadlock, if so, let's retry
+      conn.rollback();
+      if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) {
+        LOG.warn("Caught deadlock, retrying db creation");
+        prepDb();
+      } else {
+        throw e;
+      }
     } finally {
+      deadlockCnt = 0;
       closeResources(conn, stmt, null);
     }
   }