You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/03/25 00:28:28 UTC
[2/3] hive git commit: HIVE-13344 - port HIVE-12902 to 1.x line
(Eugene Koifman, reviewed by Wei Zheng)
HIVE-13344 - port HIVE-12902 to 1.x line (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c8295051
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c8295051
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c8295051
Branch: refs/heads/branch-1
Commit: c8295051cc26577dcc1eb17709d4ffc0f9784c5b
Parents: db2efe4
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Mar 24 16:21:07 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Mar 24 16:21:07 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +-
.../hive/ql/txn/compactor/TestCompactor.java | 25 +-
.../hive/metastore/AcidEventListener.java | 38 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 14 +-
.../hive/metastore/HiveMetaStoreClient.java | 6 +-
.../metastore/txn/CompactionTxnHandler.java | 47 +--
.../hadoop/hive/metastore/txn/TxnHandler.java | 127 +------
.../hadoop/hive/metastore/txn/TxnStore.java | 364 +++++++++++++++++++
.../hadoop/hive/metastore/txn/TxnUtils.java | 209 +++++++++++
.../metastore/txn/TestCompactionTxnHandler.java | 5 +-
.../hive/metastore/txn/TestTxnHandler.java | 183 +++++-----
.../metastore/txn/TestTxnHandlerNegative.java | 2 +-
.../ql/txn/AcidCompactionHistoryService.java | 16 +-
.../hive/ql/txn/AcidHouseKeeperService.java | 13 +-
.../hive/ql/txn/compactor/CompactorThread.java | 7 +-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 8 +-
.../hadoop/hive/ql/txn/compactor/Worker.java | 6 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 +-
.../hive/ql/lockmgr/TestDbTxnManager.java | 12 +-
.../hive/ql/txn/compactor/CompactorTest.java | 34 +-
.../hive/ql/txn/compactor/TestCleaner.java | 20 +-
.../hive/ql/txn/compactor/TestInitiator.java | 7 +-
.../hive/ql/txn/compactor/TestWorker.java | 13 +-
23 files changed, 847 insertions(+), 337 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b78bea2..f84c940 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -39,6 +39,7 @@ import java.util.regex.Pattern;
import javax.security.auth.login.LoginException;
+import com.google.common.base.Joiner;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -56,7 +57,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hive.common.HiveCompat;
-import com.google.common.base.Joiner;
/**
* Hive Configuration.
@@ -607,6 +607,10 @@ public class HiveConf extends Configuration {
METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore",
"Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. \n" +
"This class is used to store and retrieval of raw metadata objects such as table, database"),
+ METASTORE_TXN_STORE_IMPL("hive.metastore.txn.store.impl",
+ "org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler",
+ "Name of class that implements org.apache.hadoop.hive.metastore.txn.TxnStore. This " +
+ "class is used to store and retrieve transactions and locks"),
METASTORE_CONNECTION_DRIVER("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver",
"Driver class name for a JDBC metastore"),
METASTORE_MANAGER_FACTORY_CLASS("javax.jdo.PersistenceManagerFactoryClass",
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 9c0f374..37bbab8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -8,7 +8,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -20,10 +19,10 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
@@ -187,7 +186,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(4, compacts.size());
@@ -290,7 +289,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(4, compacts.size());
@@ -363,7 +362,7 @@ public class TestCompactor {
execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
tblName + " after load:");
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
CompactionInfo ci = new CompactionInfo("default", tblName, "bkt=0", CompactionType.MAJOR);
LOG.debug("List of stats columns before analyze Part1: " + txnHandler.findColumnsWithStats(ci));
Worker.StatsUpdater su = Worker.StatsUpdater.init(ci, colNames, conf,
@@ -498,7 +497,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(2, compacts.size());
@@ -538,7 +537,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(2, compacts.size());
@@ -580,7 +579,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
@@ -620,7 +619,7 @@ public class TestCompactor {
writeBatch(connection, writer, true);
// Now, compact
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
@@ -682,7 +681,7 @@ public class TestCompactor {
writeBatch(connection, writer, true);
// Now, compact
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
@@ -738,7 +737,7 @@ public class TestCompactor {
txnBatch.abort();
// Now, compact
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
@@ -804,7 +803,7 @@ public class TestCompactor {
// Now, compact
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
index 767bc54..b241e9e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
@@ -25,7 +25,8 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
/**
@@ -33,7 +34,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnHandler;
*/
public class AcidEventListener extends MetaStoreEventListener {
- private TxnHandler txnHandler;
+ private TxnStore txnHandler;
private HiveConf hiveConf;
public AcidEventListener(Configuration configuration) {
@@ -46,24 +47,47 @@ public class AcidEventListener extends MetaStoreEventListener {
// We can loop thru all the tables to check if they are ACID first and then perform cleanup,
// but it's more efficient to unconditionally perform cleanup for the database, especially
// when there are a lot of tables
- txnHandler = new TxnHandler(hiveConf);
+ txnHandler = getTxnHandler();
txnHandler.cleanupRecords(HiveObjectType.DATABASE, dbEvent.getDatabase(), null, null);
}
@Override
public void onDropTable(DropTableEvent tableEvent) throws MetaException {
- if (TxnHandler.isAcidTable(tableEvent.getTable())) {
- txnHandler = new TxnHandler(hiveConf);
+ if (TxnUtils.isAcidTable(tableEvent.getTable())) {
+ txnHandler = getTxnHandler();
txnHandler.cleanupRecords(HiveObjectType.TABLE, null, tableEvent.getTable(), null);
}
}
@Override
public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
- if (TxnHandler.isAcidTable(partitionEvent.getTable())) {
- txnHandler = new TxnHandler(hiveConf);
+ if (TxnUtils.isAcidTable(partitionEvent.getTable())) {
+ txnHandler = getTxnHandler();
txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, partitionEvent.getTable(),
partitionEvent.getPartitionIterator());
}
}
+ private TxnStore getTxnHandler() {
+ boolean hackOn = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST) ||
+ HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST);
+ String origTxnMgr = null;
+ boolean origConcurrency = false;
+
+ // Since TxnUtils.getTxnStore calls TxnHandler.setConf -> checkQFileTestHack -> TxnDbUtil.setConfValues,
+ // which may change the values of below two entries, we need to avoid pulluting the original values
+ if (hackOn) {
+ origTxnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER);
+ origConcurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+ }
+
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
+
+ // Set them back
+ if (hackOn) {
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, origTxnMgr);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, origConcurrency);
+ }
+
+ return txnHandler;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index fba545d..bf65532 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
-
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -176,7 +175,8 @@ import org.apache.hadoop.hive.metastore.model.MRoleMap;
import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege;
import org.apache.hadoop.hive.metastore.model.MTablePrivilege;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.shims.HadoopShims;
@@ -308,9 +308,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
};
- private final ThreadLocal<TxnHandler> threadLocalTxn = new ThreadLocal<TxnHandler>() {
+ private static final ThreadLocal<TxnStore> threadLocalTxn = new ThreadLocal<TxnStore>() {
@Override
- protected synchronized TxnHandler initialValue() {
+ protected TxnStore initialValue() {
return null;
}
};
@@ -584,10 +584,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
return ms;
}
- private TxnHandler getTxnHandler() {
- TxnHandler txn = threadLocalTxn.get();
+ private TxnStore getTxnHandler() {
+ TxnStore txn = threadLocalTxn.get();
if (txn == null) {
- txn = new TxnHandler(hiveConf);
+ txn = TxnUtils.getTxnStore(hiveConf);
threadLocalTxn.set(txn);
}
return txn;
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 393ef3b..50bf43c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -134,7 +134,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
@@ -1846,12 +1846,12 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
@Override
public ValidTxnList getValidTxns() throws TException {
- return TxnHandler.createValidReadTxnList(client.get_open_txns(), 0);
+ return TxnUtils.createValidReadTxnList(client.get_open_txns(), 0);
}
@Override
public ValidTxnList getValidTxns(long currentTxn) throws TException {
- return TxnHandler.createValidReadTxnList(client.get_open_txns(), currentTxn);
+ return TxnUtils.createValidReadTxnList(client.get_open_txns(), currentTxn);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 28e06ed..f7c738a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -20,27 +20,33 @@ package org.apache.hadoop.hive.metastore.txn;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.util.StringUtils;
-import java.sql.*;
-import java.util.*;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
/**
* Extends the transaction handler with methods needed only by the compactor threads. These
* methods are not available through the thrift interface.
*/
-public class CompactionTxnHandler extends TxnHandler {
+class CompactionTxnHandler extends TxnHandler {
static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
// Always access COMPACTION_QUEUE before COMPLETED_TXN_COMPONENTS
// See TxnHandler for notes on how to deal with deadlocks. Follow those notes.
- public CompactionTxnHandler(HiveConf conf) {
- super(conf);
+ public CompactionTxnHandler() {
}
/**
@@ -385,7 +391,7 @@ public class CompactionTxnHandler extends TxnHandler {
}
// Populate the complete query with provided prefix and suffix
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false);
for (String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
@@ -450,7 +456,7 @@ public class CompactionTxnHandler extends TxnHandler {
prefix.append("delete from TXNS where ");
suffix.append("");
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false);
for (String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
@@ -620,27 +626,6 @@ public class CompactionTxnHandler extends TxnHandler {
}
/**
- * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
- * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
- * compact the files, and thus treats only open transactions as invalid. Additionally any
- * txnId > highestOpenTxnId is also invalid. This is avoid creating something like
- * delta_17_120 where txnId 80, for example, is still open.
- * @param txns txn list from the metastore
- * @return a valid txn list.
- */
- public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
- long highWater = txns.getTxn_high_water_mark();
- long minOpenTxn = Long.MAX_VALUE;
- long[] exceptions = new long[txns.getOpen_txnsSize()];
- int i = 0;
- for (TxnInfo txn : txns.getOpen_txns()) {
- if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
- exceptions[i++] = txn.getId();
- }
- highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
- return new ValidCompactorTxnList(exceptions, -1, highWater);
- }
- /**
* Record the highest txn id that the {@code ci} compaction job will pay attention to.
*/
public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException {
@@ -746,7 +731,7 @@ public class CompactionTxnHandler extends TxnHandler {
prefix.append("delete from COMPLETED_COMPACTIONS where ");
suffix.append("");
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false);
for (String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 0ddc078..9789371 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -87,14 +87,7 @@ import java.util.concurrent.locks.ReentrantLock;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class TxnHandler {
- // Compactor states (Should really be enum)
- static final public String INITIATED_RESPONSE = "initiated";
- static final public String WORKING_RESPONSE = "working";
- static final public String CLEANING_RESPONSE = "ready for cleaning";
- static final public String FAILED_RESPONSE = "failed";
- static final public String SUCCEEDED_RESPONSE = "succeeded";
- static final public String ATTEMPTED_RESPONSE = "attempted";
+abstract class TxnHandler implements TxnStore {
static final protected char INITIATED_STATE = 'i';
static final protected char WORKING_STATE = 'w';
@@ -131,7 +124,7 @@ public class TxnHandler {
* Number of consecutive deadlocks we have seen
*/
private int deadlockCnt;
- private final long deadlockRetryInterval;
+ private long deadlockRetryInterval;
protected HiveConf conf;
protected DatabaseProduct dbProduct;
@@ -139,8 +132,8 @@ public class TxnHandler {
private long timeout;
private String identifierQuoteString; // quotes to use for quoting tables, where necessary
- private final long retryInterval;
- private final int retryLimit;
+ private long retryInterval;
+ private int retryLimit;
private int retryNum;
/**
* Derby specific concurrency control
@@ -157,7 +150,10 @@ public class TxnHandler {
// in mind. To do this they should call checkRetryable() AFTER rolling back the db transaction,
// and then they should catch RetryException and call themselves recursively. See commitTxn for an example.
- public TxnHandler(HiveConf conf) {
+ public TxnHandler() {
+ }
+
+ public void setConf(HiveConf conf) {
this.conf = conf;
checkQFileTestHack();
@@ -183,7 +179,6 @@ public class TxnHandler {
TimeUnit.MILLISECONDS);
retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
deadlockRetryInterval = retryInterval / 10;
-
}
public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
@@ -1211,6 +1206,7 @@ public class TxnHandler {
* Clean up corresponding records in metastore tables, specifically:
* TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS
*/
+ @Override
public void cleanupRecords(HiveObjectType type, Database db, Table table,
Iterator<Partition> partitionIterator) throws MetaException {
try {
@@ -1386,106 +1382,11 @@ public class TxnHandler {
String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
}
-
- /**
- * Build a query (or queries if one query is too big) with specified "prefix" and "suffix",
- * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) OR id in (4,5,6)
- * For NOT IN case, NOT IN list is broken into multiple AND clauses.
- * @param queries array of complete query strings
- * @param prefix part of the query that comes before IN list
- * @param suffix part of the query that comes after IN list
- * @param inList the list containing IN list values
- * @param inColumn column name of IN list operator
- * @param addParens add a pair of parenthesis outside the IN lists
- * e.g. ( id in (1,2,3) OR id in (4,5,6) )
- * @param notIn clause to be broken up is NOT IN
- */
- public static void buildQueryWithINClause(HiveConf conf, List<String> queries, StringBuilder prefix,
- StringBuilder suffix, List<Long> inList,
- String inColumn, boolean addParens, boolean notIn) {
- int batchSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
- int numWholeBatches = inList.size() / batchSize;
- StringBuilder buf = new StringBuilder();
- buf.append(prefix);
- if (addParens) {
- buf.append("(");
- }
- buf.append(inColumn);
- if (notIn) {
- buf.append(" not in (");
- } else {
- buf.append(" in (");
- }
-
- for (int i = 0; i <= numWholeBatches; i++) {
- if (needNewQuery(conf, buf)) {
- // Wrap up current query string
- if (addParens) {
- buf.append(")");
- }
- buf.append(suffix);
- queries.add(buf.toString());
-
- // Prepare a new query string
- buf.setLength(0);
- }
-
- if (i > 0) {
- if (notIn) {
- if (buf.length() == 0) {
- buf.append(prefix);
- if (addParens) {
- buf.append("(");
- }
- } else {
- buf.append(" and ");
- }
- buf.append(inColumn);
- buf.append(" not in (");
- } else {
- if (buf.length() == 0) {
- buf.append(prefix);
- if (addParens) {
- buf.append("(");
- }
- } else {
- buf.append(" or ");
- }
- buf.append(inColumn);
- buf.append(" in (");
- }
- }
-
- if (i * batchSize == inList.size()) {
- // At this point we just realized we don't need another query
- return;
- }
- for (int j = i * batchSize; j < (i + 1) * batchSize && j < inList.size(); j++) {
- buf.append(inList.get(j)).append(",");
- }
- buf.setCharAt(buf.length() - 1, ')');
- }
-
- if (addParens) {
- buf.append(")");
- }
- buf.append(suffix);
- queries.add(buf.toString());
- }
-
- /** Estimate if the size of a string will exceed certain limit */
- private static boolean needNewQuery(HiveConf conf, StringBuilder sb) {
- int queryMemoryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH);
- // http://www.javamex.com/tutorials/memory/string_memory_usage.shtml
- long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8);
- return sizeInBytes / 1024 > queryMemoryLimit;
- }
-
/**
* For testing only, do not use.
*/
@VisibleForTesting
- int numLocksInLockTable() throws SQLException, MetaException {
+ public int numLocksInLockTable() throws SQLException, MetaException {
Connection dbConn = null;
Statement stmt = null;
ResultSet rs = null;
@@ -1508,7 +1409,7 @@ public class TxnHandler {
/**
* For testing only, do not use.
*/
- long setTimeout(long milliseconds) {
+ public long setTimeout(long milliseconds) {
long previous_timeout = timeout;
timeout = milliseconds;
return previous_timeout;
@@ -1975,7 +1876,7 @@ public class TxnHandler {
suffix.append("");
}
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", true, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", true, false);
for (String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
@@ -1998,7 +1899,7 @@ public class TxnHandler {
prefix.append("delete from HIVE_LOCKS where ");
suffix.append("");
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "hl_txnid", false, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "hl_txnid", false, false);
for (String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
@@ -2435,7 +2336,7 @@ public class TxnHandler {
prefix.append(" and hl_txnid = 0 and ");
suffix.append("");
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "hl_lock_ext_id", true, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "hl_lock_ext_id", true, false);
int deletedLocks = 0;
for (String query : queries) {
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
new file mode 100644
index 0000000..6fc6ed9
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A handler to answer transaction related calls that come into the metastore
+ * server.
+ *
+ * Note on log messages: Please include txnid:X and lockid info using
+ * {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)}
+ * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages.
+ * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated,
+ * so keeping the format consistent makes grep'ing the logs much easier.
+ *
+ * Note on HIVE_LOCKS.hl_last_heartbeat.
+ * For locks that are part of transaction, we set this 0 (would rather set it to NULL but
+ * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding
+ * transaction in TXNS.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface TxnStore {
+
+ // Compactor states (Should really be enum)
+ static final public String INITIATED_RESPONSE = "initiated";
+ static final public String WORKING_RESPONSE = "working";
+ static final public String CLEANING_RESPONSE = "ready for cleaning";
+ static final public String FAILED_RESPONSE = "failed";
+ static final public String SUCCEEDED_RESPONSE = "succeeded";
+ static final public String ATTEMPTED_RESPONSE = "attempted";
+
+ public static final int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 1000;
+
+ public void setConf(HiveConf conf);
+
+ /**
+ * Get information about open transactions. This gives extensive information about the
+ * transactions rather than just the list of transactions. This should be used when the need
+ * is to see information about the transactions (e.g. show transactions).
+ * @return information about open transactions
+ * @throws MetaException
+ */
+ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException;
+
+ /**
+ * Get list of valid transactions. This gives just the list of transactions that are open.
+ * @return list of open transactions, as well as a high water mark.
+ * @throws MetaException
+ */
+ public GetOpenTxnsResponse getOpenTxns() throws MetaException;
+
+ /**
+ * Open a set of transactions
+ * @param rqst request to open transactions
+ * @return information on opened transactions
+ * @throws MetaException
+ */
+ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException;
+
+ /**
+ * Abort (rollback) a transaction.
+ * @param rqst info on transaction to abort
+ * @throws NoSuchTxnException
+ * @throws MetaException
+ */
+ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException;
+
+ /**
+ * Commit a transaction
+ * @param rqst info on transaction to commit
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public void commitTxn(CommitTxnRequest rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+ /**
+ * Obtain a lock.
+ * @param rqst information on the lock to obtain. If the requester is part of a transaction
+ * the txn information must be included in the lock request.
+ * @return info on the lock, including whether it was obtained.
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public LockResponse lock(LockRequest rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+ /**
+ * Check whether a lock has been obtained. This is used after {@link #lock} returned a wait
+ * state.
+ * @param rqst info on the lock to check
+ * @return info on the state of the lock
+ * @throws NoSuchTxnException
+ * @throws NoSuchLockException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public LockResponse checkLock(CheckLockRequest rqst)
+ throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
+
+ /**
+ * Unlock a lock. It is not legal to call this if the caller is part of a txn. In that case
+ * the txn should be committed or aborted instead. (Note someday this will change since
+ * multi-statement transactions will allow unlocking in the transaction.)
+ * @param rqst lock to unlock
+ * @throws NoSuchLockException
+ * @throws TxnOpenException
+ * @throws MetaException
+ */
+ public void unlock(UnlockRequest rqst)
+ throws NoSuchLockException, TxnOpenException, MetaException;
+
+ /**
+ * Get information on current locks.
+ * @param rqst lock information to retrieve
+ * @return lock information.
+ * @throws MetaException
+ */
+ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException;
+
+ /**
+ * Send a heartbeat for a lock or a transaction
+ * @param ids lock and/or txn id to heartbeat
+ * @throws NoSuchTxnException
+ * @throws NoSuchLockException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public void heartbeat(HeartbeatRequest ids)
+ throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
+
+ /**
+ * Heartbeat a group of transactions together
+ * @param rqst set of transactions to heartbat
+ * @return info on txns that were heartbeated
+ * @throws MetaException
+ */
+ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
+ throws MetaException;
+
+ /**
+ * Submit a compaction request into the queue. This is called when a user manually requests a
+ * compaction.
+ * @param rqst information on what to compact
+ * @return id of the compaction that has been started
+ * @throws MetaException
+ */
+ public long compact(CompactionRequest rqst) throws MetaException;
+
+ /**
+ * Show list of current compactions
+ * @param rqst info on which compactions to show
+ * @return compaction information
+ * @throws MetaException
+ */
+ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException;
+
+ /**
+ * Add information on a set of dynamic partitions that participated in a transaction.
+ * @param rqst dynamic partition info.
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public void addDynamicPartitions(AddDynamicPartitions rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+ /**
+ * Clean up corresponding records in metastore tables
+ * @param type Hive object type
+ * @param db database object
+ * @param table table object
+ * @param partitionIterator partition iterator
+ * @throws MetaException
+ */
+ public void cleanupRecords(HiveObjectType type, Database db, Table table,
+ Iterator<Partition> partitionIterator) throws MetaException;
+ /**
+ * Timeout transactions and/or locks. This should only be called by the compactor.
+ */
+ public void performTimeOuts();
+
+ /**
+ * This will look through the completed_txn_components table and look for partitions or tables
+ * that may be ready for compaction. Also, look through txns and txn_components tables for
+ * aborted transactions that we should add to the list.
+ * @param maxAborted Maximum number of aborted queries to allow before marking this as a
+ * potential compaction.
+ * @return list of CompactionInfo structs. These will not have id, type,
+ * or runAs set since these are only potential compactions not actual ones.
+ */
+ public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException;
+
+ /**
+ * Sets the user to run as. This is for the case
+ * where the request was generated by the user and so the worker must set this value later.
+ * @param cq_id id of this entry in the queue
+ * @param user user to run the jobs as
+ */
+ public void setRunAs(long cq_id, String user) throws MetaException;
+
+ /**
+ * This will grab the next compaction request off of
+ * the queue, and assign it to the worker.
+ * @param workerId id of the worker calling this, will be recorded in the db
+ * @return an info element for this compaction request, or null if there is no work to do now.
+ */
+ public CompactionInfo findNextToCompact(String workerId) throws MetaException;
+
+ /**
+ * This will mark an entry in the queue as compacted
+ * and put it in the ready to clean state.
+ * @param info info on the compaction entry to mark as compacted.
+ */
+ public void markCompacted(CompactionInfo info) throws MetaException;
+
+ /**
+ * Find entries in the queue that are ready to
+ * be cleaned.
+ * @return information on the entry in the queue.
+ */
+ public List<CompactionInfo> findReadyToClean() throws MetaException;
+
+ /**
+ * This will remove an entry from the queue after
+ * it has been compacted.
+ *
+ * @param info info on the compaction entry to remove
+ */
+ public void markCleaned(CompactionInfo info) throws MetaException;
+
+ /**
+ * Mark a compaction entry as failed. This will move it to the compaction history queue with a
+ * failed status. It will NOT clean up aborted transactions in the table/partition associated
+ * with this compaction.
+ * @param info information on the compaction that failed.
+ * @throws MetaException
+ */
+ public void markFailed(CompactionInfo info) throws MetaException;
+
+ /**
+ * Clean up aborted transactions from txns that have no components in txn_components. The reson such
+ * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
+ * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
+ */
+ public void cleanEmptyAbortedTxns() throws MetaException;
+
+ /**
+ * This will take all entries assigned to workers
+ * on a host return them to INITIATED state. The initiator should use this at start up to
+ * clean entries from any workers that were in the middle of compacting when the metastore
+ * shutdown. It does not reset entries from worker threads on other hosts as those may still
+ * be working.
+ * @param hostname Name of this host. It is assumed this prefixes the thread's worker id,
+ * so that like hostname% will match the worker id.
+ */
+ public void revokeFromLocalWorkers(String hostname) throws MetaException;
+
+ /**
+ * This call will return all compaction queue
+ * entries assigned to a worker but over the timeout back to the initiated state.
+ * This should be called by the initiator on start up and occasionally when running to clean up
+ * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called
+ * first.
+ * @param timeout number of milliseconds since start time that should elapse before a worker is
+ * declared dead.
+ */
+ public void revokeTimedoutWorkers(long timeout) throws MetaException;
+
+ /**
+ * Queries metastore DB directly to find columns in the table which have statistics information.
+ * If {@code ci} includes partition info then per partition stats info is examined, otherwise
+ * table level stats are examined.
+ * @throws MetaException
+ */
+ public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException;
+
+ /**
+ * Record the highest txn id that the {@code ci} compaction job will pay attention to.
+ */
+ public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException;
+
+ /**
+ * For any given compactable entity (partition, table if not partitioned) the history of compactions
+ * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the
+ * history such that a configurable number of each type of state is present. Any other entries
+ * can be purged. This scheme has advantage of always retaining the last failure/success even if
+ * it's not recent.
+ * @throws MetaException
+ */
+ public void purgeCompactionHistory() throws MetaException;
+
+ /**
+ * Determine if there are enough consecutive failures compacting a table or partition that no
+ * new automatic compactions should be scheduled. User initiated compactions do not do this
+ * check.
+ * @param ci Table or partition to check.
+ * @return true if it is ok to compact, false if there have been too many failures.
+ * @throws MetaException
+ */
+ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;
+
+
+ @VisibleForTesting
+ public int numLocksInLockTable() throws SQLException, MetaException;
+
+ @VisibleForTesting
+ long setTimeout(long milliseconds);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
new file mode 100644
index 0000000..f60e34b
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TxnUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
+
+ /**
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a
+ * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
+ * read the files, and thus treats both open and aborted transactions as invalid.
+ * @param txns txn list from the metastore
+ * @param currentTxn Current transaction that the user has open. If this is greater than 0 it
+ * will be removed from the exceptions list so that the user sees his own
+ * transaction as valid.
+ * @return a valid txn list.
+ */
+ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
+ long highWater = txns.getTxn_high_water_mark();
+ Set<Long> open = txns.getOpen_txns();
+ long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
+ int i = 0;
+ for(long txn: open) {
+ if (currentTxn > 0 && currentTxn == txn) continue;
+ exceptions[i++] = txn;
+ }
+ return new ValidReadTxnList(exceptions, highWater);
+ }
+
+ /**
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
+ * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
+ * compact the files, and thus treats only open transactions as invalid. Additionally any
+ * txnId > highestOpenTxnId is also invalid. This is avoid creating something like
+ * delta_17_120 where txnId 80, for example, is still open.
+ * @param txns txn list from the metastore
+ * @return a valid txn list.
+ */
+ public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
+ long highWater = txns.getTxn_high_water_mark();
+ long minOpenTxn = Long.MAX_VALUE;
+ long[] exceptions = new long[txns.getOpen_txnsSize()];
+ int i = 0;
+ for (TxnInfo txn : txns.getOpen_txns()) {
+ if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
+ exceptions[i++] = txn.getId();
+ }
+ highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
+ return new ValidCompactorTxnList(exceptions, -1, highWater);
+ }
+
+ /**
+ * Get an instance of the TxnStore that is appropriate for this store
+ * @param conf configuration
+ * @return txn store
+ */
+ public static TxnStore getTxnStore(HiveConf conf) {
+ String className = conf.getVar(HiveConf.ConfVars.METASTORE_TXN_STORE_IMPL);
+ try {
+ TxnStore handler = ((Class<? extends TxnHandler>) MetaStoreUtils.getClass(
+ className)).newInstance();
+ handler.setConf(conf);
+ return handler;
+ } catch (Exception e) {
+ LOG.error("Unable to instantiate raw store directly in fastpath mode", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Checks if a table is a valid ACID table.
+ * Note, users are responsible for using the correct TxnManager. We do not look at
+ * SessionState.get().getTxnMgr().supportsAcid() here
+ * @param table table
+ * @return true if table is a legit ACID table, false otherwise
+ */
+ public static boolean isAcidTable(Table table) {
+ if (table == null) {
+ return false;
+ }
+ Map<String, String> parameters = table.getParameters();
+ String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
+ }
+ /**
+ * Build a query (or queries if one query is too big) with specified "prefix" and "suffix",
+ * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) OR id in (4,5,6)
+ * For NOT IN case, NOT IN list is broken into multiple AND clauses.
+ * @param queries array of complete query strings
+ * @param prefix part of the query that comes before IN list
+ * @param suffix part of the query that comes after IN list
+ * @param inList the list containing IN list values
+ * @param inColumn column name of IN list operator
+ * @param addParens add a pair of parenthesis outside the IN lists
+ * e.g. ( id in (1,2,3) OR id in (4,5,6) )
+ * @param notIn clause to be broken up is NOT IN
+ */
+ public static void buildQueryWithINClause(HiveConf conf, List<String> queries, StringBuilder prefix,
+ StringBuilder suffix, List<Long> inList,
+ String inColumn, boolean addParens, boolean notIn) {
+ int batchSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
+ int numWholeBatches = inList.size() / batchSize;
+ StringBuilder buf = new StringBuilder();
+ buf.append(prefix);
+ if (addParens) {
+ buf.append("(");
+ }
+ buf.append(inColumn);
+ if (notIn) {
+ buf.append(" not in (");
+ } else {
+ buf.append(" in (");
+ }
+
+ for (int i = 0; i <= numWholeBatches; i++) {
+ if (needNewQuery(conf, buf)) {
+ // Wrap up current query string
+ if (addParens) {
+ buf.append(")");
+ }
+ buf.append(suffix);
+ queries.add(buf.toString());
+
+ // Prepare a new query string
+ buf.setLength(0);
+ }
+
+ if (i > 0) {
+ if (notIn) {
+ if (buf.length() == 0) {
+ buf.append(prefix);
+ if (addParens) {
+ buf.append("(");
+ }
+ } else {
+ buf.append(" and ");
+ }
+ buf.append(inColumn);
+ buf.append(" not in (");
+ } else {
+ if (buf.length() == 0) {
+ buf.append(prefix);
+ if (addParens) {
+ buf.append("(");
+ }
+ } else {
+ buf.append(" or ");
+ }
+ buf.append(inColumn);
+ buf.append(" in (");
+ }
+ }
+
+ if (i * batchSize == inList.size()) {
+ // At this point we just realized we don't need another query
+ return;
+ }
+ for (int j = i * batchSize; j < (i + 1) * batchSize && j < inList.size(); j++) {
+ buf.append(inList.get(j)).append(",");
+ }
+ buf.setCharAt(buf.length() - 1, ')');
+ }
+
+ if (addParens) {
+ buf.append(")");
+ }
+ buf.append(suffix);
+ queries.add(buf.toString());
+ }
+
+ /** Estimate if the size of a string will exceed certain limit */
+ private static boolean needNewQuery(HiveConf conf, StringBuilder sb) {
+ int queryMemoryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH);
+ // http://www.javamex.com/tutorials/memory/string_memory_usage.shtml
+ long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8);
+ return sizeInBytes / 1024 > queryMemoryLimit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 051da60..bdeacb9 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -42,8 +42,7 @@ import static junit.framework.Assert.*;
public class TestCompactionTxnHandler {
private HiveConf conf = new HiveConf();
- private CompactionTxnHandler txnHandler;
- static final private Log LOG = LogFactory.getLog(TestCompactionTxnHandler.class);
+ private TxnStore txnHandler;
public TestCompactionTxnHandler() throws Exception {
TxnDbUtil.setConfValues(conf);
@@ -424,7 +423,7 @@ public class TestCompactionTxnHandler {
@Before
public void setUp() throws Exception {
TxnDbUtil.prepDb();
- txnHandler = new CompactionTxnHandler(conf);
+ txnHandler = TxnUtils.getTxnStore(conf);
}
@After
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 930af7c..b8cab71 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -34,7 +34,11 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import static junit.framework.Assert.*;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
/**
* Tests for TxnHandler.
@@ -44,7 +48,7 @@ public class TestTxnHandler {
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
private HiveConf conf = new HiveConf();
- private TxnHandler txnHandler;
+ private TxnStore txnHandler;
public TestTxnHandler() throws Exception {
TxnDbUtil.setConfValues(conf);
@@ -1111,99 +1115,102 @@ public class TestTxnHandler {
@Ignore
public void deadlockDetected() throws Exception {
LOG.debug("Starting deadlock test");
- Connection conn = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- Statement stmt = conn.createStatement();
- long now = txnHandler.getDbTime(conn);
- stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
- "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
- "'scooby.com')");
- stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
- "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
- "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
- txnHandler.LOCK_WAITING + "', '" + txnHandler.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
- "'scooby.com')");
- conn.commit();
- txnHandler.closeDbConn(conn);
-
- final AtomicBoolean sawDeadlock = new AtomicBoolean();
-
- final Connection conn1 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- final Connection conn2 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- try {
+ if (txnHandler instanceof TxnHandler) {
+ final TxnHandler tHndlr = (TxnHandler)txnHandler;
+ Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Statement stmt = conn.createStatement();
+ long now = tHndlr.getDbTime(conn);
+ stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
+ "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
+ "'scooby.com')");
+ stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
+ "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
+ "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
+ tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
+ "'scooby.com')");
+ conn.commit();
+ tHndlr.closeDbConn(conn);
+
+ final AtomicBoolean sawDeadlock = new AtomicBoolean();
+
+ final Connection conn1 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ final Connection conn2 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ try {
- for (int i = 0; i < 5; i++) {
- Thread t1 = new Thread() {
- @Override
- public void run() {
- try {
+ for (int i = 0; i < 5; i++) {
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
try {
- updateTxns(conn1);
- updateLocks(conn1);
- Thread.sleep(1000);
- conn1.commit();
- LOG.debug("no exception, no deadlock");
- } catch (SQLException e) {
try {
- txnHandler.checkRetryable(conn1, e, "thread t1");
- LOG.debug("Got an exception, but not a deadlock, SQLState is " +
- e.getSQLState() + " class of exception is " + e.getClass().getName() +
- " msg is <" + e.getMessage() + ">");
- } catch (TxnHandler.RetryException de) {
- LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
- "exception is " + e.getClass().getName() + " msg is <" + e
- .getMessage() + ">");
- sawDeadlock.set(true);
+ updateTxns(conn1);
+ updateLocks(conn1);
+ Thread.sleep(1000);
+ conn1.commit();
+ LOG.debug("no exception, no deadlock");
+ } catch (SQLException e) {
+ try {
+ tHndlr.checkRetryable(conn1, e, "thread t1");
+ LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+ e.getSQLState() + " class of exception is " + e.getClass().getName() +
+ " msg is <" + e.getMessage() + ">");
+ } catch (TxnHandler.RetryException de) {
+ LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+ "exception is " + e.getClass().getName() + " msg is <" + e
+ .getMessage() + ">");
+ sawDeadlock.set(true);
+ }
}
+ conn1.rollback();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- conn1.rollback();
- } catch (Exception e) {
- throw new RuntimeException(e);
}
- }
- };
+ };
- Thread t2 = new Thread() {
- @Override
- public void run() {
- try {
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
try {
- updateLocks(conn2);
- updateTxns(conn2);
- Thread.sleep(1000);
- conn2.commit();
- LOG.debug("no exception, no deadlock");
- } catch (SQLException e) {
try {
- txnHandler.checkRetryable(conn2, e, "thread t2");
- LOG.debug("Got an exception, but not a deadlock, SQLState is " +
- e.getSQLState() + " class of exception is " + e.getClass().getName() +
- " msg is <" + e.getMessage() + ">");
- } catch (TxnHandler.RetryException de) {
- LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
- "exception is " + e.getClass().getName() + " msg is <" + e
- .getMessage() + ">");
- sawDeadlock.set(true);
+ updateLocks(conn2);
+ updateTxns(conn2);
+ Thread.sleep(1000);
+ conn2.commit();
+ LOG.debug("no exception, no deadlock");
+ } catch (SQLException e) {
+ try {
+ tHndlr.checkRetryable(conn2, e, "thread t2");
+ LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+ e.getSQLState() + " class of exception is " + e.getClass().getName() +
+ " msg is <" + e.getMessage() + ">");
+ } catch (TxnHandler.RetryException de) {
+ LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+ "exception is " + e.getClass().getName() + " msg is <" + e
+ .getMessage() + ">");
+ sawDeadlock.set(true);
+ }
}
+ conn2.rollback();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- conn2.rollback();
- } catch (Exception e) {
- throw new RuntimeException(e);
}
- }
- };
-
- t1.start();
- t2.start();
- t1.join();
- t2.join();
- if (sawDeadlock.get()) break;
+ };
+
+ t1.start();
+ t2.start();
+ t1.join();
+ t2.join();
+ if (sawDeadlock.get()) break;
+ }
+ assertTrue(sawDeadlock.get());
+ } finally {
+ conn1.rollback();
+ tHndlr.closeDbConn(conn1);
+ conn2.rollback();
+ tHndlr.closeDbConn(conn2);
}
- assertTrue(sawDeadlock.get());
- } finally {
- conn1.rollback();
- txnHandler.closeDbConn(conn1);
- conn2.rollback();
- txnHandler.closeDbConn(conn2);
}
}
@@ -1236,7 +1243,7 @@ public class TestTxnHandler {
for (long i = 1; i <= 200; i++) {
inList.add(i);
}
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
Assert.assertEquals(1, queries.size());
runAgainstDerby(queries);
@@ -1244,7 +1251,7 @@ public class TestTxnHandler {
// The first query has 2 full batches, and the second query only has 1 batch which only contains 1 member
queries.clear();
inList.add((long)201);
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
Assert.assertEquals(2, queries.size());
runAgainstDerby(queries);
@@ -1255,13 +1262,13 @@ public class TestTxnHandler {
for (long i = 202; i <= 4321; i++) {
inList.add(i);
}
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
Assert.assertEquals(3, queries.size());
runAgainstDerby(queries);
// Case 4 - NOT IN list
queries.clear();
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true);
Assert.assertEquals(3, queries.size());
runAgainstDerby(queries);
@@ -1269,7 +1276,7 @@ public class TestTxnHandler {
queries.clear();
suffix.setLength(0);
suffix.append("");
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false);
Assert.assertEquals(3, queries.size());
runAgainstDerby(queries);
}
@@ -1297,7 +1304,7 @@ public class TestTxnHandler {
@Before
public void setUp() throws Exception {
TxnDbUtil.prepDb();
- txnHandler = new TxnHandler(conf);
+ txnHandler = TxnUtils.getTxnStore(conf);
}
@After
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
index abe1e37..6c27515 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
@@ -37,7 +37,7 @@ public class TestTxnHandlerNegative {
conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "blah");
RuntimeException e = null;
try {
- TxnHandler txnHandler1 = new TxnHandler(conf);
+ TxnUtils.getTxnStore(conf);
}
catch(RuntimeException ex) {
LOG.info("Expected error: " + ex.getMessage(), ex);
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
index a91ca5c..59c8fe4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
@@ -18,20 +18,12 @@
package org.apache.hadoop.hive.ql.txn;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HouseKeeperService;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
-import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
-import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -60,10 +52,10 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
}
private static final class ObsoleteEntryReaper implements Runnable {
- private final CompactionTxnHandler txnHandler;
+ private final TxnStore txnHandler;
private final AtomicInteger isAliveCounter;
private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
- txnHandler = new CompactionTxnHandler(hiveConf);
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
this.isAliveCounter = isAliveCounter;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
index 38151fb..de74a7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
@@ -20,15 +20,10 @@ package org.apache.hadoop.hive.ql.txn;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HouseKeeperService;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
-import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -58,10 +53,10 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
}
private static final class TimedoutTxnReaper implements Runnable {
- private final TxnHandler txnHandler;
+ private final TxnStore txnHandler;
private final AtomicInteger isAliveCounter;
private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
- txnHandler = new TxnHandler(hiveConf);
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
this.isAliveCounter = isAliveCounter;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index c956f58..ae8865c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -50,7 +51,7 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
protected HiveConf conf;
- protected CompactionTxnHandler txnHandler;
+ protected TxnStore txnHandler;
protected RawStore rs;
protected int threadId;
protected AtomicBoolean stop;
@@ -75,7 +76,7 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
setDaemon(true); // this means the process will exit without waiting for this thread
// Get our own instance of the transaction handler
- txnHandler = new CompactionTxnHandler(conf);
+ txnHandler = TxnUtils.getTxnStore(conf);
// Get our own connection to the database so we can get table and partition information.
rs = RawStoreProxy.getProxy(conf, conf,
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index c023c27..1898a4d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -37,8 +37,8 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@@ -81,7 +81,7 @@ public class Initiator extends CompactorThread {
try {//todo: add method to only get current i.e. skip history - more efficient
ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
ValidTxnList txns =
- CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
+ TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
LOG.debug("Found " + potentials.size() + " potential compactions, " +
"checking to see if we should compact any of them");
@@ -184,7 +184,7 @@ public class Initiator extends CompactorThread {
CompactionInfo ci) {
if (compactions.getCompacts() != null) {
for (ShowCompactResponseElement e : compactions.getCompacts()) {
- if ((e.getState().equals(TxnHandler.WORKING_RESPONSE) || e.getState().equals(TxnHandler.INITIATED_RESPONSE)) &&
+ if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) &&
e.getDbname().equals(ci.dbname) &&
e.getTablename().equals(ci.tableName) &&
(e.getPartitionname() == null && ci.partName == null ||
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 59a765b..516b92e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -27,13 +27,15 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
@@ -135,7 +137,7 @@ public class Worker extends CompactorThread {
final boolean isMajor = ci.isMajorCompaction();
final ValidTxnList txns =
- CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
+ TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
LOG.debug("ValidCompactTxnList: " + txns.writeToString());
txnHandler.setCompactionHighestTxnId(ci, txns.getHighWatermark());
final StringBuilder jobName = new StringBuilder(name);
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 44b77e7..6f8dc35 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService;
@@ -486,7 +486,7 @@ public class TestTxnCommands2 {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
AtomicBoolean stop = new AtomicBoolean(true);
//create failed compactions
for(int i = 0; i < numFailedCompactions; i++) {
@@ -556,27 +556,27 @@ public class TestTxnCommands2 {
private int working;
private int total;
}
- private static CompactionsByState countCompacts(TxnHandler txnHandler) throws MetaException {
+ private static CompactionsByState countCompacts(TxnStore txnHandler) throws MetaException {
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
CompactionsByState compactionsByState = new CompactionsByState();
compactionsByState.total = resp.getCompactsSize();
for(ShowCompactResponseElement compact : resp.getCompacts()) {
- if(TxnHandler.FAILED_RESPONSE.equals(compact.getState())) {
+ if(TxnStore.FAILED_RESPONSE.equals(compact.getState())) {
compactionsByState.failed++;
}
- else if(TxnHandler.CLEANING_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.CLEANING_RESPONSE.equals(compact.getState())) {
compactionsByState.readyToClean++;
}
- else if(TxnHandler.INITIATED_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.INITIATED_RESPONSE.equals(compact.getState())) {
compactionsByState.initiated++;
}
- else if(TxnHandler.SUCCEEDED_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.SUCCEEDED_RESPONSE.equals(compact.getState())) {
compactionsByState.succeeded++;
}
- else if(TxnHandler.WORKING_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.WORKING_RESPONSE.equals(compact.getState())) {
compactionsByState.working++;
}
- else if(TxnHandler.ATTEMPTED_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) {
compactionsByState.attempted++;
}
}
@@ -632,7 +632,7 @@ public class TestTxnCommands2 {
runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
//run Worker to execute compaction
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index a4f7e5b..99705b4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
@@ -42,7 +42,11 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@@ -243,10 +247,10 @@ public class TestDbTxnManager {
}
expireLocks(txnMgr, 5);
//create a lot of locks
- for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
+ for(int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat
}
- expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
+ expireLocks(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
}
private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception {
DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager();