You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/03/17 23:47:29 UTC
[25/51] [abbrv] hive git commit: HIVE-10632 : Make sure
TXN_COMPONENTS gets cleaned up if table is dropped before compaction (Wei
Zheng, reviewed by Alan Gates)
HIVE-10632 : Make sure TXN_COMPONENTS gets cleaned up if table is dropped before compaction (Wei Zheng, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/456a91ec
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/456a91ec
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/456a91ec
Branch: refs/heads/llap
Commit: 456a91ecde6a449177a76fb34ad9b5f13983821b
Parents: ff55d0a
Author: Wei Zheng <we...@apache.org>
Authored: Thu Mar 10 14:37:35 2016 -0800
Committer: Wei Zheng <we...@apache.org>
Committed: Thu Mar 10 14:37:35 2016 -0800
----------------------------------------------------------------------
.../hive/metastore/AcidEventListener.java | 94 +++++++++
.../hadoop/hive/metastore/HiveMetaStore.java | 1 +
.../hadoop/hive/metastore/txn/TxnDbUtil.java | 20 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 167 ++++++++++++++-
.../hadoop/hive/metastore/txn/TxnStore.java | 37 ++--
.../hadoop/hive/metastore/txn/TxnUtils.java | 18 ++
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 27 +--
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 +-
.../hive/ql/lockmgr/TestDbTxnManager2.java | 209 ++++++++++++++++++-
9 files changed, 518 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/456a91ec/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
new file mode 100644
index 0000000..71ad916
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+
+
+/**
+ * It handles cleanup of dropped partition/table/database in ACID related metastore tables
+ */
+public class AcidEventListener extends MetaStoreEventListener {
+
+ private TxnStore txnHandler;
+ private HiveConf hiveConf;
+
+ public AcidEventListener(Configuration configuration) {
+ super(configuration);
+ hiveConf = (HiveConf) configuration;
+ }
+
+ @Override
+ public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException {
+ // We can loop thru all the tables to check if they are ACID first and then perform cleanup,
+ // but it's more efficient to unconditionally perform cleanup for the database, especially
+ // when there are a lot of tables
+ txnHandler = getTxnHandler();
+ txnHandler.cleanupRecords(HiveObjectType.DATABASE, dbEvent.getDatabase(), null, null);
+ }
+
+ @Override
+ public void onDropTable(DropTableEvent tableEvent) throws MetaException {
+ if (TxnUtils.isAcidTable(tableEvent.getTable())) {
+ txnHandler = getTxnHandler();
+ txnHandler.cleanupRecords(HiveObjectType.TABLE, null, tableEvent.getTable(), null);
+ }
+ }
+
+ @Override
+ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
+ if (TxnUtils.isAcidTable(partitionEvent.getTable())) {
+ txnHandler = getTxnHandler();
+ txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, partitionEvent.getTable(),
+ partitionEvent.getPartitionIterator());
+ }
+ }
+
+ private TxnStore getTxnHandler() {
+ boolean hackOn = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST) ||
+ HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST);
+ String origTxnMgr = null;
+ boolean origConcurrency = false;
+
+ // Since TxnUtils.getTxnStore calls TxnHandler.setConf -> checkQFileTestHack -> TxnDbUtil.setConfValues,
+ // which may change the values of below two entries, we need to avoid pulluting the original values
+ if (hackOn) {
+ origTxnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER);
+ origConcurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+ }
+
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
+
+ // Set them back
+ if (hackOn) {
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, origTxnMgr);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, origConcurrency);
+ }
+
+ return txnHandler;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/456a91ec/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 0e8a157..f0bc560 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -443,6 +443,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, hiveConf,
hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS));
listeners.add(new SessionPropertiesListener(hiveConf));
+ listeners.add(new AcidEventListener(hiveConf));
if (metrics != null) {
listeners.add(new HMSMetricsListener(hiveConf, metrics));
http://git-wip-us.apache.org/repos/asf/hive/blob/456a91ec/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 2a7545c..5d10b5c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -32,7 +32,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.ShimLoader;
/**
- * Utility methods for creating and destroying txn database/schema.
+ * Utility methods for creating and destroying txn database/schema, plus methods for
+ * querying against metastore tables.
* Placed here in a separate class so it can be shared across unit tests.
*/
public final class TxnDbUtil {
@@ -142,8 +143,13 @@ public final class TxnDbUtil {
conn.commit();
} catch (SQLException e) {
+ try {
+ conn.rollback();
+ } catch (SQLException re) {
+ System.err.println("Error rolling back: " + re.getMessage());
+ }
+
// This might be a deadlock, if so, let's retry
- conn.rollback();
if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) {
LOG.warn("Caught deadlock, retrying db creation");
prepDb();
@@ -219,14 +225,20 @@ public final class TxnDbUtil {
}
}
- public static int findNumCurrentLocks() throws Exception {
+ /**
+ * Utility method used to run COUNT queries like "select count(*) from ..." against metastore tables
+ * @param countQuery countQuery text
+ * @return count countQuery result
+ * @throws Exception
+ */
+ public static int countQueryAgent(String countQuery) throws Exception {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = getConnection();
stmt = conn.createStatement();
- rs = stmt.executeQuery("select count(*) from hive_locks");
+ rs = stmt.executeQuery(countQuery);
if (!rs.next()) {
return 0;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/456a91ec/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index d4d0162..53d2bb4 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -25,6 +25,7 @@ import org.apache.commons.dbcp.DriverManagerConnectionFactory;
import org.apache.commons.dbcp.PoolableConnectionFactory;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.dbcp.PoolingDataSource;
@@ -1153,6 +1154,170 @@ abstract class TxnHandler implements TxnStore {
}
/**
+ * Clean up corresponding records in metastore tables, specifically:
+ * TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS
+ */
+ public void cleanupRecords(HiveObjectType type, Database db, Table table,
+ Iterator<Partition> partitionIterator) throws MetaException {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+
+ try {
+ String dbName;
+ String tblName;
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ List<String> queries = new ArrayList<String>();
+ StringBuilder buff = new StringBuilder();
+
+ switch (type) {
+ case DATABASE:
+ dbName = db.getName();
+
+ buff.append("delete from TXN_COMPONENTS where tc_database='");
+ buff.append(dbName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='");
+ buff.append(dbName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPACTION_QUEUE where cq_database='");
+ buff.append(dbName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPLETED_COMPACTIONS where cc_database='");
+ buff.append(dbName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ break;
+ case TABLE:
+ dbName = table.getDbName();
+ tblName = table.getTableName();
+
+ buff.append("delete from TXN_COMPONENTS where tc_database='");
+ buff.append(dbName);
+ buff.append("' and tc_table='");
+ buff.append(tblName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='");
+ buff.append(dbName);
+ buff.append("' and ctc_table='");
+ buff.append(tblName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPACTION_QUEUE where cq_database='");
+ buff.append(dbName);
+ buff.append("' and cq_table='");
+ buff.append(tblName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPLETED_COMPACTIONS where cc_database='");
+ buff.append(dbName);
+ buff.append("' and cc_table='");
+ buff.append(tblName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ break;
+ case PARTITION:
+ dbName = table.getDbName();
+ tblName = table.getTableName();
+ List<FieldSchema> partCols = table.getPartitionKeys(); // partition columns
+ List<String> partVals; // partition values
+ String partName;
+
+ while (partitionIterator.hasNext()) {
+ Partition p = partitionIterator.next();
+ partVals = p.getValues();
+ partName = Warehouse.makePartName(partCols, partVals);
+
+ buff.append("delete from TXN_COMPONENTS where tc_database='");
+ buff.append(dbName);
+ buff.append("' and tc_table='");
+ buff.append(tblName);
+ buff.append("' and tc_partition='");
+ buff.append(partName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='");
+ buff.append(dbName);
+ buff.append("' and ctc_table='");
+ buff.append(tblName);
+ buff.append("' and ctc_partition='");
+ buff.append(partName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPACTION_QUEUE where cq_database='");
+ buff.append(dbName);
+ buff.append("' and cq_table='");
+ buff.append(tblName);
+ buff.append("' and cq_partition='");
+ buff.append(partName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPLETED_COMPACTIONS where cc_database='");
+ buff.append(dbName);
+ buff.append("' and cc_table='");
+ buff.append(tblName);
+ buff.append("' and cc_partition='");
+ buff.append(partName);
+ buff.append("'");
+ queries.add(buff.toString());
+ }
+
+ break;
+ default:
+ throw new MetaException("Invalid object type for cleanup: " + type);
+ }
+
+ for (String query : queries) {
+ LOG.debug("Going to execute update <" + query + ">");
+ stmt.executeUpdate(query);
+ }
+
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "cleanupRecords");
+ if (e.getMessage().contains("does not exist")) {
+ LOG.warn("Cannot perform cleanup since metastore table does not exist");
+ } else {
+ throw new MetaException("Unable to clean up " + StringUtils.stringifyException(e));
+ }
+ } finally {
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ cleanupRecords(type, db, table, partitionIterator);
+ }
+ }
+
+ /**
* For testing only, do not use.
*/
@VisibleForTesting
@@ -1599,7 +1764,7 @@ abstract class TxnHandler implements TxnStore {
TxnDbUtil.prepDb();
} catch (Exception e) {
// We may have already created the tables and thus don't need to redo it.
- if (!e.getMessage().contains("already exists")) {
+ if (e.getMessage() != null && !e.getMessage().contains("already exists")) {
throw new RuntimeException("Unable to set up transaction database for" +
" testing: " + e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/456a91ec/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 5e0306a..6d738b5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -21,32 +21,10 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
-import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
-import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
-import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
-import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
-import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
-import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
-import org.apache.hadoop.hive.metastore.api.TxnOpenException;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.*;
import java.sql.SQLException;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -216,6 +194,17 @@ public interface TxnStore {
throws NoSuchTxnException, TxnAbortedException, MetaException;
/**
+ * Clean up corresponding records in metastore tables
+ * @param type Hive object type
+ * @param db database object
+ * @param table table object
+ * @param partitionIterator partition iterator
+ * @throws MetaException
+ */
+ public void cleanupRecords(HiveObjectType type, Database db, Table table,
+ Iterator<Partition> partitionIterator) throws MetaException;
+
+ /**
* Timeout transactions and/or locks. This should only be called by the compactor.
*/
public void performTimeOuts();
http://git-wip-us.apache.org/repos/asf/hive/blob/456a91ec/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index b7502c2..0d90b11 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -23,11 +23,14 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
import java.util.Set;
public class TxnUtils {
@@ -94,4 +97,19 @@ public class TxnUtils {
throw new RuntimeException(e);
}
}
+
+ /** Checks if a table is a valid ACID table.
+ * Note, users are responsible for using the correct TxnManager. We do not look at
+ * SessionState.get().getTxnMgr().supportsAcid() here
+ * @param table table
+ * @return true if table is a legit ACID table, false otherwise
+ */
+ public static boolean isAcidTable(Table table) {
+ if (table == null) {
+ return false;
+ }
+ Map<String, String> parameters = table.getParameters();
+ String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/456a91ec/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 9bf9377..2b50a2a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hive.ql.io;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.OutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -706,12 +704,7 @@ public class AcidUtils {
HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable);
}
- /** Checks metadata to make sure it's a valid ACID table at metadata level
- * Three things we will check:
- * 1. TBLPROPERTIES 'transactional'='true'
- * 2. The table should be bucketed
- * 3. InputFormatClass/OutputFormatClass should implement AcidInputFormat/AcidOutputFormat
- * Currently OrcInputFormat/OrcOutputFormat is the only implementer
+ /** Checks if a table is a valid ACID table.
* Note, users are responsible for using the correct TxnManager. We do not look at
* SessionState.get().getTxnMgr().supportsAcid() here
* @param table table
@@ -725,23 +718,7 @@ public class AcidUtils {
if (tableIsTransactional == null) {
tableIsTransactional = table.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
}
- if (tableIsTransactional == null || !tableIsTransactional.equalsIgnoreCase("true")) {
- return false;
- }
-
- List<String> bucketCols = table.getBucketCols();
- if (bucketCols == null || bucketCols.isEmpty()) {
- return false;
- }
-
- Class<? extends InputFormat> inputFormatClass = table.getInputFormatClass();
- Class<? extends OutputFormat> outputFormatClass = table.getOutputFormatClass();
- if (inputFormatClass == null || outputFormatClass == null ||
- !AcidInputFormat.class.isAssignableFrom(inputFormatClass) ||
- !AcidOutputFormat.class.isAssignableFrom(outputFormatClass)) {
- return false;
- }
- return true;
+ return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/456a91ec/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index f4debfe..9b00435 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -331,14 +331,7 @@ public class TestTxnCommands2 {
// 4. Perform a major compaction
runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
- Worker w = new Worker();
- w.setThreadId((int) w.getId());
- w.setHiveConf(hiveConf);
- AtomicBoolean stop = new AtomicBoolean();
- AtomicBoolean looped = new AtomicBoolean();
- stop.set(true);
- w.init(stop, looped);
- w.run();
+ runWorker(hiveConf);
// There should be 1 new directory: base_xxxxxxx.
// Original bucket files and delta directory should stay until Cleaner kicks in.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
@@ -375,14 +368,7 @@ public class TestTxnCommands2 {
// Before Cleaner, there should be 5 items:
// 2 original files, 1 original directory, 1 base directory and 1 delta directory
Assert.assertEquals(5, status.length);
- Cleaner c = new Cleaner();
- c.setThreadId((int) c.getId());
- c.setHiveConf(hiveConf);
- stop = new AtomicBoolean();
- looped = new AtomicBoolean();
- stop.set(true);
- c.init(stop, looped);
- c.run();
+ runCleaner(hiveConf);
// There should be only 1 directory left: base_xxxxxxx.
// Original bucket files and delta directory should have been cleaned up.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
@@ -596,7 +582,7 @@ public class TestTxnCommands2 {
}
return compactionsByState;
}
- private static void runWorker(HiveConf hiveConf) throws MetaException {
+ public static void runWorker(HiveConf hiveConf) throws MetaException {
AtomicBoolean stop = new AtomicBoolean(true);
Worker t = new Worker();
t.setThreadId((int) t.getId());
@@ -605,7 +591,7 @@ public class TestTxnCommands2 {
t.init(stop, looped);
t.run();
}
- private static void runCleaner(HiveConf hiveConf) throws MetaException {
+ public static void runCleaner(HiveConf hiveConf) throws MetaException {
AtomicBoolean stop = new AtomicBoolean(true);
Cleaner t = new Cleaner();
t.setThreadId((int) t.getId());
http://git-wip-us.apache.org/repos/asf/hive/blob/456a91ec/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 1b07d4b..d1b370e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -248,34 +248,233 @@ public class TestDbTxnManager2 {
cpr = driver.run("create table T11 (a int, b int) clustered by(b) into 2 buckets stored as orc");
checkCmdOnDriver(cpr);
- // Now switch to DummyTxnManager
- conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
- txnMgr = SessionState.get().initTxnMgr(conf);
- Assert.assertTrue(txnMgr instanceof DummyTxnManager);
-
// All DML should fail with DummyTxnManager on ACID table
+ useDummyTxnManagerTemporarily(conf);
cpr = driver.compileAndRespond("select * from T10");
Assert.assertEquals(ErrorMsg.TXNMGR_NOT_ACID.getErrorCode(), cpr.getResponseCode());
Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table"));
+ useDummyTxnManagerTemporarily(conf);
cpr = driver.compileAndRespond("insert into table T10 values (1, 2)");
Assert.assertEquals(ErrorMsg.TXNMGR_NOT_ACID.getErrorCode(), cpr.getResponseCode());
Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table"));
+ useDummyTxnManagerTemporarily(conf);
cpr = driver.compileAndRespond("insert overwrite table T10 select a, b from T11");
Assert.assertEquals(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID.getErrorCode(), cpr.getResponseCode());
Assert.assertTrue(cpr.getErrorMessage().contains("INSERT OVERWRITE not allowed on table with OutputFormat" +
" that implements AcidOutputFormat while transaction manager that supports ACID is in use"));
+ useDummyTxnManagerTemporarily(conf);
cpr = driver.compileAndRespond("update T10 set a=0 where b=1");
Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), cpr.getResponseCode());
Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations."));
+ useDummyTxnManagerTemporarily(conf);
cpr = driver.compileAndRespond("delete from T10");
Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), cpr.getResponseCode());
Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations."));
}
+ /**
+ * Temporarily set DummyTxnManager as the txn manager for the session.
+ * HIVE-10632: we have to do this for every new query, because this jira introduced an AcidEventListener
+ * in HiveMetaStore, which will instantiate a txn handler, but due to HIVE-12902, we have to call
+ * TxnHandler.setConf and TxnHandler.checkQFileTestHack and TxnDbUtil.setConfValues, which will
+ * set txn manager back to DbTxnManager.
+ */
+ private void useDummyTxnManagerTemporarily(HiveConf hiveConf) throws Exception {
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
+ txnMgr = SessionState.get().initTxnMgr(hiveConf);
+ Assert.assertTrue(txnMgr instanceof DummyTxnManager);
+ }
+
+ /**
+ * Normally the compaction process will clean up records in TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS,
+ * COMPACTION_QUEUE and COMPLETED_COMPACTIONS. But if a table/partition has been dropped before
+ * compaction and there are still relevant records in those metastore tables, the Initiator will
+ * complain about not being able to find the table/partition. This method is to test and make sure
+ * we clean up relevant records as soon as a table/partition is dropped.
+ *
+ * Note, here we don't need to worry about cleaning up TXNS table, since it's handled separately.
+ * @throws Exception
+ */
+ @Test
+ public void testMetastoreTablesCleanup() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create database if not exists temp");
+ checkCmdOnDriver(cpr);
+
+ // Create some ACID tables: T10, T11 - unpartitioned table, T12p, T13p - partitioned table
+ cpr = driver.run("create table temp.T10 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table temp.T11 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table temp.T12p (a int, b int) partitioned by (ds string, hour string) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table temp.T13p (a int, b int) partitioned by (ds string, hour string) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+
+ // Successfully insert some data into ACID tables, so that we have records in COMPLETED_TXN_COMPONENTS
+ cpr = driver.run("insert into temp.T10 values (1, 1)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("insert into temp.T10 values (2, 2)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("insert into temp.T11 values (3, 3)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("insert into temp.T11 values (4, 4)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("insert into temp.T12p partition (ds='today', hour='1') values (5, 5)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (6, 6)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("insert into temp.T13p partition (ds='today', hour='1') values (7, 7)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("insert into temp.T13p partition (ds='tomorrow', hour='2') values (8, 8)");
+ checkCmdOnDriver(cpr);
+ int count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11')");
+ Assert.assertEquals(4, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t12p', 't13p')");
+ Assert.assertEquals(4, count);
+
+ // Fail some inserts, so that we have records in TXN_COMPONENTS
+ conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+ cpr = driver.run("insert into temp.T10 values (9, 9)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("insert into temp.T11 values (10, 10)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("insert into temp.T12p partition (ds='today', hour='1') values (11, 11)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("insert into temp.T13p partition (ds='today', hour='1') values (12, 12)");
+ checkCmdOnDriver(cpr);
+ count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')");
+ Assert.assertEquals(4, count);
+ conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+
+ // Drop a table/partition; corresponding records in TXN_COMPONENTS and COMPLETED_TXN_COMPONENTS should disappear
+ count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t10'");
+ Assert.assertEquals(1, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t10'");
+ Assert.assertEquals(2, count);
+ cpr = driver.run("drop table temp.T10");
+ checkCmdOnDriver(cpr);
+ count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t10'");
+ Assert.assertEquals(0, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t10'");
+ Assert.assertEquals(0, count);
+
+ count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t12p' and TC_PARTITION='ds=today/hour=1'");
+ Assert.assertEquals(1, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t12p' and CTC_PARTITION='ds=today/hour=1'");
+ Assert.assertEquals(1, count);
+ cpr = driver.run("alter table temp.T12p drop partition (ds='today', hour='1')");
+ checkCmdOnDriver(cpr);
+ count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t12p' and TC_PARTITION='ds=today/hour=1'");
+ Assert.assertEquals(0, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t12p' and CTC_PARTITION='ds=today/hour=1'");
+ Assert.assertEquals(0, count);
+
+ // Successfully perform compaction on a table/partition, so that we have successful records in COMPLETED_COMPACTIONS
+ cpr = driver.run("alter table temp.T11 compact 'minor'");
+ checkCmdOnDriver(cpr);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='i'");
+ Assert.assertEquals(1, count);
+ org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='r' and CQ_TYPE='i'");
+ Assert.assertEquals(1, count);
+ org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'");
+ Assert.assertEquals(0, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='s' and CC_TYPE='i'");
+ Assert.assertEquals(1, count);
+
+ cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'minor'");
+ checkCmdOnDriver(cpr);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='i'");
+ Assert.assertEquals(1, count);
+ org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='r' and CQ_TYPE='i'");
+ Assert.assertEquals(1, count);
+ org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'");
+ Assert.assertEquals(0, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='s' and CC_TYPE='i'");
+ Assert.assertEquals(1, count);
+
+ // Fail compaction, so that we have failed records in COMPLETED_COMPACTIONS
+ conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
+ cpr = driver.run("alter table temp.T11 compact 'major'");
+ checkCmdOnDriver(cpr);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'");
+ Assert.assertEquals(1, count);
+ org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'");
+ Assert.assertEquals(0, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='f' and CC_TYPE='a'");
+ Assert.assertEquals(1, count);
+
+ cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'major'");
+ checkCmdOnDriver(cpr);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'");
+ Assert.assertEquals(1, count);
+ org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'");
+ Assert.assertEquals(0, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='f' and CC_TYPE='a'");
+ Assert.assertEquals(1, count);
+ conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false);
+
+ // Put 2 records into COMPACTION_QUEUE and do nothing
+ cpr = driver.run("alter table temp.T11 compact 'major'");
+ checkCmdOnDriver(cpr);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'");
+ Assert.assertEquals(1, count);
+ cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'major'");
+ checkCmdOnDriver(cpr);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'");
+ Assert.assertEquals(1, count);
+
+ // Drop a table/partition, corresponding records in COMPACTION_QUEUE and COMPLETED_COMPACTIONS should disappear
+ cpr = driver.run("drop table temp.T11");
+ checkCmdOnDriver(cpr);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'");
+ Assert.assertEquals(0, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11'");
+ Assert.assertEquals(0, count);
+
+ cpr = driver.run("alter table temp.T12p drop partition (ds='tomorrow', hour='2')");
+ checkCmdOnDriver(cpr);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'");
+ Assert.assertEquals(0, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p'");
+ Assert.assertEquals(0, count);
+
+ // Put 1 record into COMPACTION_QUEUE and do nothing
+ cpr = driver.run("alter table temp.T13p partition (ds='today', hour='1') compact 'major'");
+ checkCmdOnDriver(cpr);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t13p' and CQ_STATE='i' and CQ_TYPE='a'");
+ Assert.assertEquals(1, count);
+
+ // Drop database, everything in all 4 meta tables should disappear
+ count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')");
+ Assert.assertEquals(1, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11', 't12p', 't13p')");
+ Assert.assertEquals(2, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')");
+ Assert.assertEquals(1, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 't12p', 't13p')");
+ Assert.assertEquals(0, count);
+ cpr = driver.run("drop database if exists temp cascade");
+ checkCmdOnDriver(cpr);
+ count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')");
+ Assert.assertEquals(0, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11', 't12p', 't13p')");
+ Assert.assertEquals(0, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')");
+ Assert.assertEquals(0, count);
+ count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 't12p', 't13p')");
+ Assert.assertEquals(0, count);
+ }
+
private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) {
Assert.assertEquals(l.toString(),l.getType(), type);
Assert.assertEquals(l.toString(),l.getState(), state);