You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/03/25 00:28:27 UTC
[1/3] hive git commit: HIVE-13344 - port HIVE-12902 to 1.x line
(Eugene Koifman, reviewed by Wei Zheng)
Repository: hive
Updated Branches:
refs/heads/branch-1 db2efe42a -> 178708231
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 5545574..cac4623 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -20,15 +20,31 @@ package org.apache.hadoop.hive.ql.txn.compactor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.MetaStoreThread;
-import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -39,7 +55,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.thrift.TException;
@@ -62,7 +82,7 @@ public abstract class CompactorTest {
static final private String CLASS_NAME = CompactorTest.class.getName();
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
- protected CompactionTxnHandler txnHandler;
+ protected TxnStore txnHandler;
protected IMetaStoreClient ms;
protected long sleepTime = 1000;
protected HiveConf conf;
@@ -75,7 +95,7 @@ public abstract class CompactorTest {
TxnDbUtil.setConfValues(conf);
TxnDbUtil.cleanDb();
ms = new HiveMetaStoreClient(conf);
- txnHandler = new CompactionTxnHandler(conf);
+ txnHandler = TxnUtils.getTxnStore(conf);
tmpdir = new File(System.getProperty("java.io.tmpdir") +
System.getProperty("file.separator") + "compactor_test_tables");
tmpdir.mkdir();
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 913c8bc..17634f0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -25,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.junit.Test;
import java.util.ArrayList;
@@ -73,7 +73,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
// Check that the files are removed
List<Path> paths = getDirectories(conf, t, null);
@@ -105,7 +105,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
// Check that the files are removed
List<Path> paths = getDirectories(conf, t, p);
@@ -135,7 +135,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
// Check that the files are removed
List<Path> paths = getDirectories(conf, t, null);
@@ -174,7 +174,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
// Check that the files are removed
List<Path> paths = getDirectories(conf, t, p);
@@ -329,7 +329,7 @@ public class TestCleaner extends CompactorTest {
rsp = txnHandler.showCompact(new ShowCompactRequest());
compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
}
@Test
@@ -403,7 +403,7 @@ public class TestCleaner extends CompactorTest {
rsp = txnHandler.showCompact(new ShowCompactRequest());
compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
}
@Test
@@ -429,7 +429,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
// Check that the files are removed
List<Path> paths = getDirectories(conf, t, p);
@@ -460,7 +460,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
}
@Test
@@ -488,7 +488,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
}
@Override
boolean useHive130DeltaDirName() {
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 07f477d..f84bd7e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -17,13 +17,12 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.junit.Before;
import org.junit.Test;
@@ -205,12 +204,12 @@ public class TestInitiator extends CompactorTest {
LockResponse res = txnHandler.lock(req);
txnHandler.abortTxn(new AbortTxnRequest(txnid));
- for (int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50; i++) {
+ for (int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50; i++) {
txnid = openTxn();
txnHandler.abortTxn(new AbortTxnRequest(txnid));
}
GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
- Assert.assertEquals(TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
+ Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
startInitiator();
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 8862402..381eeb3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -22,13 +22,18 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -935,7 +940,7 @@ public class TestWorker extends CompactorTest {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState()));
}
@Test
@@ -960,6 +965,6 @@ public class TestWorker extends CompactorTest {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
}
}
[3/3] hive git commit: HIVE-11388 - Allow ACID Compactor components
to run in multiple metastores (Eugene Koifman, reviewed by Wei Zheng)
Posted by ek...@apache.org.
HIVE-11388 - Allow ACID Compactor components to run in multiple metastores (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/17870823
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/17870823
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/17870823
Branch: refs/heads/branch-1
Commit: 178708231e09bb2c08aa05cf9979efd6d3cd542c
Parents: c829505
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Mar 24 16:22:21 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Mar 24 16:22:21 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/common/ServerUtils.java | 14 ++
.../deployers/config/hive/hive-site.mysql.xml | 24 ++-
.../hive/metastore/txn/CompactionInfo.java | 4 +
.../metastore/txn/CompactionTxnHandler.java | 7 +-
.../hadoop/hive/metastore/txn/TxnDbUtil.java | 20 ++-
.../hadoop/hive/metastore/txn/TxnHandler.java | 169 ++++++++++++++++++-
.../hadoop/hive/metastore/txn/TxnStore.java | 33 +++-
.../hadoop/hive/metastore/txn/TxnUtils.java | 4 +-
.../metastore/txn/ValidCompactorTxnList.java | 2 +-
.../hive/metastore/txn/TestTxnHandler.java | 93 ++++++++++
.../ql/txn/AcidCompactionHistoryService.java | 7 +
.../hive/ql/txn/AcidHouseKeeperService.java | 7 +
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 62 ++++++-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 19 ++-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 7 +-
.../hive/ql/lockmgr/TestDbTxnManager.java | 6 +
16 files changed, 445 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
index a284f18..4141770 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
@@ -24,6 +24,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
/**
* ServerUtils (specific to HiveServer version 1)
*/
@@ -47,4 +50,15 @@ public class ServerUtils {
}
}
+ /**
+ * @return name of current host
+ */
+ public static String hostname() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ LOG.error("Unable to resolve my host name " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
----------------------------------------------------------------------
diff --git a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
index b6f1ab7..387da6c 100644
--- a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
+++ b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
@@ -62,13 +62,14 @@
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
+
<property>
<name>hive.compactor.initiator.on</name>
- <value>false</value>
+ <value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
- <value>2</value>
+ <value>5</value>
</property>
<property>
<name>hive.timedout.txn.reaper.start</name>
@@ -81,9 +82,24 @@
-->
<property>
<name>hive.timedout.txn.reaper.interval</name>
- <value>30s</value>
+ <value>1s</value>
+ </property>
+ <property>
+ <name>hive.compactor.history.reaper.interval</name>
+ <value>1s</value>
+ </property>
+ <property>
+ <name>hive.compactor.cleaner.run.interval</name>
+ <value>1s</value>
+ </property>
+ <property>
+ <name>hive.compactor.check.interval</name>
+ <value>1s</value>
+ </property>
+ <property>
+ <name>hive.compactor.delta.num.threshold</name>
+ <value>2</value>
</property>
-
<!--end ACID related properties-->
<!--
<property>
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index 73255d2..bea1473 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.metastore.txn;
import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -39,6 +40,9 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
public boolean tooManyAborts = false;
/**
* {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL)
+ * See {@link TxnStore#setCompactionHighestTxnId(CompactionInfo, long)} for precise definition.
+ * See also {@link TxnUtils#createValidCompactTxnList(GetOpenTxnsInfoResponse)} and
+ * {@link ValidCompactorTxnList#highWatermark}
*/
public long highestTxnId;
byte[] metaInfo;
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index f7c738a..cdff357 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -161,6 +161,8 @@ class CompactionTxnHandler extends TxnHandler {
try {
Connection dbConn = null;
Statement stmt = null;
+ //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725)
+ Statement updStmt = null;
ResultSet rs = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -174,6 +176,7 @@ class CompactionTxnHandler extends TxnHandler {
dbConn.rollback();
return null;
}
+ updStmt = dbConn.createStatement();
do {
CompactionInfo info = new CompactionInfo();
info.id = rs.getLong(1);
@@ -187,7 +190,7 @@ class CompactionTxnHandler extends TxnHandler {
"cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id +
" AND cq_state='" + INITIATED_STATE + "'";
LOG.debug("Going to execute update <" + s + ">");
- int updCount = stmt.executeUpdate(s);
+ int updCount = updStmt.executeUpdate(s);
if(updCount == 1) {
dbConn.commit();
return info;
@@ -211,6 +214,7 @@ class CompactionTxnHandler extends TxnHandler {
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
+ closeStmt(updStmt);
close(rs, stmt, dbConn);
}
} catch (RetryException e) {
@@ -627,6 +631,7 @@ class CompactionTxnHandler extends TxnHandler {
/**
* Record the highest txn id that the {@code ci} compaction job will pay attention to.
+ * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids.
*/
public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException {
Connection dbConn = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 42415ac..56c9ed8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -68,7 +68,7 @@ public final class TxnDbUtil {
Connection conn = null;
Statement stmt = null;
try {
- conn = getConnection();
+ conn = getConnection(true);
stmt = conn.createStatement();
stmt.execute("CREATE TABLE TXNS (" +
" TXN_ID bigint PRIMARY KEY," +
@@ -140,8 +140,13 @@ public final class TxnDbUtil {
" CC_HIGHEST_TXN_ID bigint," +
" CC_META_INFO varchar(2048) for bit data," +
" CC_HADOOP_JOB_ID varchar(32))");
-
- conn.commit();
+
+ stmt.execute("CREATE TABLE AUX_TABLE (" +
+ " MT_KEY1 varchar(128) NOT NULL," +
+ " MT_KEY2 bigint NOT NULL," +
+ " MT_COMMENT varchar(255)," +
+ " PRIMARY KEY(MT_KEY1, MT_KEY2)" +
+ ")");
} catch (SQLException e) {
try {
conn.rollback();
@@ -166,7 +171,7 @@ public final class TxnDbUtil {
Connection conn = null;
Statement stmt = null;
try {
- conn = getConnection();
+ conn = getConnection(true);
stmt = conn.createStatement();
// We want to try these, whether they succeed or fail.
@@ -185,7 +190,7 @@ public final class TxnDbUtil {
dropTable(stmt, "COMPACTION_QUEUE");
dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
dropTable(stmt, "COMPLETED_COMPACTIONS");
- conn.commit();
+ dropTable(stmt, "AUX_TABLE");
} finally {
closeResources(conn, stmt, null);
}
@@ -249,6 +254,9 @@ public final class TxnDbUtil {
}
static Connection getConnection() throws Exception {
+ return getConnection(false);
+ }
+ static Connection getConnection(boolean isAutoCommit) throws Exception {
HiveConf conf = new HiveConf();
String jdbcDriver = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER);
Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
@@ -260,7 +268,7 @@ public final class TxnDbUtil {
prop.setProperty("user", user);
prop.setProperty("password", passwd);
Connection conn = driver.connect(driverUrl, prop);
- conn.setAutoCommit(false);
+ conn.setAutoCommit(isAutoCommit);
return conn;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 9789371..a3b0751 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -23,6 +23,8 @@ import com.jolbox.bonecp.BoneCPDataSource;
import org.apache.commons.dbcp.ConnectionFactory;
import org.apache.commons.dbcp.DriverManagerConnectionFactory;
import org.apache.commons.dbcp.PoolableConnectionFactory;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.commons.logging.Log;
@@ -45,6 +47,8 @@ import javax.sql.DataSource;
import java.io.IOException;
import java.sql.*;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@@ -87,7 +91,7 @@ import java.util.concurrent.locks.ReentrantLock;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-abstract class TxnHandler implements TxnStore {
+abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
static final protected char INITIATED_STATE = 'i';
static final protected char WORKING_STATE = 'w';
@@ -139,6 +143,12 @@ abstract class TxnHandler implements TxnStore {
* Derby specific concurrency control
*/
private static final ReentrantLock derbyLock = new ReentrantLock(true);
+ /**
+ * must be static since even in UT there may be > 1 instance of TxnHandler
+ * (e.g. via Compactor services)
+ */
+ private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>();
+ private static final String hostname = ServerUtils.hostname();
// Private methods should never catch SQLException and then throw MetaException. The public
// methods depend on SQLException coming back so they can detect and handle deadlocks. Private
@@ -563,7 +573,7 @@ abstract class TxnHandler implements TxnStore {
* @throws MetaException
*/
private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException {
- String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? "AND TXN_STATE=" + quoteChar(txnState) : "");
+ String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : "");
ResultSet rs = stmt.executeQuery(addForUpdateClause(query));
if(rs.next()) {
return rs;
@@ -1437,14 +1447,14 @@ abstract class TxnHandler implements TxnStore {
}
}
- void rollbackDBConn(Connection dbConn) {
+ static void rollbackDBConn(Connection dbConn) {
try {
if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
} catch (SQLException e) {
LOG.warn("Failed to rollback db connection " + getMessage(e));
}
}
- protected void closeDbConn(Connection dbConn) {
+ protected static void closeDbConn(Connection dbConn) {
try {
if (dbConn != null && !dbConn.isClosed()) {
dbConn.close();
@@ -1458,7 +1468,7 @@ abstract class TxnHandler implements TxnStore {
* Close statement instance.
* @param stmt statement instance.
*/
- protected void closeStmt(Statement stmt) {
+ protected static void closeStmt(Statement stmt) {
try {
if (stmt != null && !stmt.isClosed()) stmt.close();
} catch (SQLException e) {
@@ -1470,7 +1480,7 @@ abstract class TxnHandler implements TxnStore {
* Close the ResultSet.
* @param rs may be {@code null}
*/
- void close(ResultSet rs) {
+ static void close(ResultSet rs) {
try {
if (rs != null && !rs.isClosed()) {
rs.close();
@@ -1484,7 +1494,7 @@ abstract class TxnHandler implements TxnStore {
/**
* Close all 3 JDBC artifacts in order: {@code rs stmt dbConn}
*/
- void close(ResultSet rs, Statement stmt, Connection dbConn) {
+ static void close(ResultSet rs, Statement stmt, Connection dbConn) {
close(rs);
closeStmt(stmt);
closeDbConn(dbConn);
@@ -2635,6 +2645,40 @@ abstract class TxnHandler implements TxnStore {
}
return false;
}
+ private boolean isDuplicateKeyError(SQLException ex) {
+ switch (dbProduct) {
+ case DERBY:
+ if("23505".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ case MYSQL:
+ if(ex.getErrorCode() == 1022 && "23000".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ case SQLSERVER:
+ //2627 is unique constaint violation incl PK, 2601 - unique key
+ if(ex.getErrorCode() == 2627 && "23000".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ case ORACLE:
+ if(ex.getErrorCode() == 1 && "23000".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ case POSTGRES:
+ //http://www.postgresql.org/docs/8.1/static/errcodes-appendix.html
+ if("23505".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected DB type: " + dbProduct + "; " + getMessage(ex));
+ }
+ return false;
+ }
private static String getMessage(SQLException ex) {
return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")";
}
@@ -2709,4 +2753,115 @@ abstract class TxnHandler implements TxnStore {
derbyLock.unlock();
}
}
+ @Override
+ public MutexAPI getMutexAPI() {
+ return this;
+ }
+
+ @Override
+ public LockHandle acquireLock(String key) throws MetaException {
+ /**
+ * The implementation here is a bit kludgey but done so that code exercised by unit tests
+ * (which run against Derby which has no support for select for update) is as similar to
+ * production code as possible.
+ * In particular, with Derby we always run in a single process with a single metastore and
+ * the absence of For Update is handled via a Semaphore. The later would strictly speaking
+ * make the SQL statments below unnecessary (for Derby), but then they would not be tested.
+ */
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ try {
+ String sqlStmt = addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1=" + quoteString(key) + " and MT_KEY2=0");
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("About to execute SQL: " + sqlStmt);
+ }
+ rs = stmt.executeQuery(sqlStmt);
+ if (!rs.next()) {
+ close(rs);
+ try {
+ stmt.executeUpdate("insert into AUX_TABLE(MT_KEY1,MT_KEY2) values(" + quoteString(key) + ", 0)");
+ dbConn.commit();
+ } catch (SQLException ex) {
+ if (!isDuplicateKeyError(ex)) {
+ throw new RuntimeException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex), ex);
+ }
+ }
+ rs = stmt.executeQuery(sqlStmt);
+ if (!rs.next()) {
+ throw new IllegalStateException("Unable to lock " + quoteString(key) + ". Expected row in AUX_TABLE is missing.");
+ }
+ }
+ Semaphore derbySemaphore = null;
+ if(dbProduct == DatabaseProduct.DERBY) {
+ derbyKey2Lock.putIfAbsent(key, new Semaphore(1));
+ derbySemaphore = derbyKey2Lock.get(key);
+ derbySemaphore.acquire();
+ }
+ LOG.info(quoteString(key) + " locked by " + quoteString(TxnHandler.hostname));
+ //OK, so now we have a lock
+ return new LockHandleImpl(dbConn, stmt, rs, key, derbySemaphore);
+ } catch (SQLException ex) {
+ rollbackDBConn(dbConn);
+ close(rs, stmt, dbConn);
+ checkRetryable(dbConn, ex, "acquireLock(" + key + ")");
+ throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex) + "; " + StringUtils.stringifyException(ex));
+ }
+ catch(InterruptedException ex) {
+ rollbackDBConn(dbConn);
+ close(rs, stmt, dbConn);
+ throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + ex.getMessage() + StringUtils.stringifyException(ex));
+ }
+ finally {
+ unlockInternal();
+ }
+ }
+ catch(RetryException ex) {
+ acquireLock(key);
+ }
+ throw new MetaException("This can't happen because checkRetryable() has a retry limit");
+ }
+ public void acquireLock(String key, LockHandle handle) {
+ //the idea is that this will use LockHandle.dbConn
+ throw new NotImplementedException();
+ }
+ private static final class LockHandleImpl implements LockHandle {
+ private final Connection dbConn;
+ private final Statement stmt;
+ private final ResultSet rs;
+ private final Semaphore derbySemaphore;
+ private final List<String> keys = new ArrayList<>();
+ LockHandleImpl(Connection conn, Statement stmt, ResultSet rs, String key, Semaphore derbySemaphore) {
+ this.dbConn = conn;
+ this.stmt = stmt;
+ this.rs = rs;
+ this.derbySemaphore = derbySemaphore;
+ if(derbySemaphore != null) {
+ //oterwise it may later release permit acquired by someone else
+ assert derbySemaphore.availablePermits() == 0 : "Expected locked Semaphore";
+ }
+ keys.add(key);
+ }
+ void addKey(String key) {
+ //keys.add(key);
+ //would need a list of (stmt,rs) pairs - 1 for each key
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void releaseLocks() {
+ rollbackDBConn(dbConn);
+ close(rs, stmt, dbConn);
+ if(derbySemaphore != null) {
+ derbySemaphore.release();
+ }
+ for(String key : keys) {
+ LOG.info(quoteString(key) + " unlocked by " + quoteString(TxnHandler.hostname));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 6fc6ed9..3aac11b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -74,6 +74,7 @@ import java.util.Set;
@InterfaceStability.Evolving
public interface TxnStore {
+ public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory}
// Compactor states (Should really be enum)
static final public String INITIATED_RESPONSE = "initiated";
static final public String WORKING_RESPONSE = "working";
@@ -355,10 +356,40 @@ public interface TxnStore {
*/
public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;
-
@VisibleForTesting
public int numLocksInLockTable() throws SQLException, MetaException;
@VisibleForTesting
long setTimeout(long milliseconds);
+
+ public MutexAPI getMutexAPI();
+
+ /**
+ * This is primarily designed to provide coarse grained mutex support to operations running
+ * inside the Metastore (of which there could be several instances). The initial goal is to
+ * ensure that various sub-processes of the Compactor don't step on each other.
+ *
+ * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use it sparingly.
+ */
+ public static interface MutexAPI {
+ /**
+ * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns
+ * a handle which must be used to release the lock. Each invocation returns a new handle.
+ */
+ public LockHandle acquireLock(String key) throws MetaException;
+
+ /**
+ * Same as {@link #acquireLock(String)} but takes an already existing handle as input. This
+ * will associate the lock on {@code key} with the same handle. All locks associated with
+ * the same handle will be released together.
+ * @param handle not NULL
+ */
+ public void acquireLock(String key, LockHandle handle) throws MetaException;
+ public static interface LockHandle {
+ /**
+ * Releases all locks associcated with this handle.
+ */
+ public void releaseLocks();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index f60e34b..4c14eef 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -75,8 +75,8 @@ public class TxnUtils {
int i = 0;
for (TxnInfo txn : txns.getOpen_txns()) {
if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
- exceptions[i++] = txn.getId();
- }
+ exceptions[i++] = txn.getId();//todo: only add Aborted
+ }//remove all exceptions < minOpenTxn
highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
return new ValidCompactorTxnList(exceptions, -1, highWater);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
index 648fd49..30bdfa7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
import java.util.Arrays;
/**
- * And implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
+ * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
* For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it
* is committed or aborted. Additionally it will return none if there are any open transactions
* below the max transaction given, since we don't want to compact above open transactions. For
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index b8cab71..6033c15 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.metastore.api.*;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.*;
+import org.apache.hadoop.util.StringUtils;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -33,6 +34,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertFalse;
@@ -1214,6 +1216,97 @@ public class TestTxnHandler {
}
}
+ /**
+ * This cannnot be run against Derby (thus in UT) but it can run againt MySQL.
+ * 1. add to metastore/pom.xml
+ * <dependency>
+ * <groupId>mysql</groupId>
+ * <artifactId>mysql-connector-java</artifactId>
+ * <version>5.1.30</version>
+ * </dependency>
+ * 2. Hack in the c'tor of this class
+ * conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost/metastore");
+ * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hive");
+ * conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive");
+ * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver");
+ * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack()
+ *
+ */
+ @Ignore("multiple threads wedge Derby")
+ @Test
+ public void testMutexAPI() throws Exception {
+ final TxnStore.MutexAPI api = txnHandler.getMutexAPI();
+ final AtomicInteger stepTracker = new AtomicInteger(0);
+ /**
+ * counter = 0;
+ * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock
+ * Thread2 counter=2, lock (and block), inc counter, should be 4
+ */
+ Thread t1 = new Thread("MutexTest1") {
+ public void run() {
+ try {
+ stepTracker.incrementAndGet();//now 1
+ TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
+ Thread.sleep(4000);
+ //stepTracker should now be 2 which indicates t2 has started
+ Assert.assertEquals("Thread2 should have started by now but not done work", 2, stepTracker.get());
+ stepTracker.incrementAndGet();//now 3
+ handle.releaseLocks();
+ }
+ catch(Exception ex) {
+ throw new RuntimeException(ex.getMessage(), ex);
+ }
+ }
+ };
+ t1.setDaemon(true);
+ ErrorHandle ueh1 = new ErrorHandle();
+ t1.setUncaughtExceptionHandler(ueh1);
+ Thread t2 = new Thread("MutexTest2") {
+ public void run() {
+ try {
+ stepTracker.incrementAndGet();//now 2
+ //this should block until t1 unlocks
+ TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
+ stepTracker.incrementAndGet();//now 4
+ Assert.assertEquals(4, stepTracker.get());
+ handle.releaseLocks();
+ stepTracker.incrementAndGet();//now 5
+ }
+ catch(Exception ex) {
+ throw new RuntimeException(ex.getMessage(), ex);
+ }
+ }
+ };
+ t2.setDaemon(true);
+ ErrorHandle ueh2 = new ErrorHandle();
+ t2.setUncaughtExceptionHandler(ueh2);
+ t1.start();
+ try {
+ Thread.sleep(1000);
+ }
+ catch(InterruptedException ex) {
+ LOG.info("Sleep was interrupted");
+ }
+ t2.start();
+ t1.join(6000);//so that test doesn't block
+ t2.join(6000);
+
+ if(ueh1.error != null) {
+ Assert.assertTrue("Unexpected error from t1: " + StringUtils.stringifyException(ueh1.error), false);
+ }
+ if (ueh2.error != null) {
+ Assert.assertTrue("Unexpected error from t2: " + StringUtils.stringifyException(ueh2.error), false);
+ }
+ Assert.assertEquals("5 means both threads have completed", 5, stepTracker.get());
+ }
+ private final static class ErrorHandle implements Thread.UncaughtExceptionHandler {
+ Throwable error = null;
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("Uncaught exception from " + t.getName() + ": " + e.getMessage());
+ error = e;
+ }
+ }
private void updateTxns(Connection conn) throws SQLException {
Statement stmt = conn.createStatement();
stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
index 59c8fe4..5d9e7be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
@@ -61,7 +61,9 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
@Override
public void run() {
+ TxnStore.MutexAPI.LockHandle handle = null;
try {
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.CompactionHistory.name());
long startTime = System.currentTimeMillis();
txnHandler.purgeCompactionHistory();
int count = isAliveCounter.incrementAndGet();
@@ -70,6 +72,11 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
catch(Throwable t) {
LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
}
+ finally {
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
index de74a7b..f39df17 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
@@ -61,7 +61,9 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
}
@Override
public void run() {
+ TxnStore.MutexAPI.LockHandle handle = null;
try {
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name());
long startTime = System.currentTimeMillis();
txnHandler.performTimeOuts();
int count = isAliveCounter.incrementAndGet();
@@ -70,6 +72,11 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
catch(Throwable t) {
LOG.fatal("Serious error in " + Thread.currentThread().getName() + ": " + t.getMessage(), t);
}
+ finally {
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 33580fd..1e6e8a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.txn.compactor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -72,11 +73,13 @@ public class Cleaner extends CompactorThread {
// and if so remembers that and then sets it to true at the end. We have to check here
// first to make sure we go through a complete iteration of the loop before resetting it.
boolean setLooped = !looped.get();
- long startedAt = System.currentTimeMillis();
+ TxnStore.MutexAPI.LockHandle handle = null;
+ long startedAt = -1;
// Make sure nothing escapes this run method and kills the metastore at large,
// so wrap it in a big catch Throwable statement.
try {
-
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+ startedAt = System.currentTimeMillis();
// First look for all the compactions that are waiting to be cleaned. If we have not
// seen an entry before, look for all the locks held on that table or partition and
// record them. We will then only clean the partition once all of those locks have been
@@ -86,6 +89,31 @@ public class Cleaner extends CompactorThread {
// done the compaction will read the more up to date version of the data (either in a
// newer delta or in a newer base).
List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+ {
+ /**
+ * Since there may be more than 1 instance of Cleaner running we may have state info
+ * for items which were cleaned by instances. Here we remove them.
+ *
+ * In the long run if we add end_time to compaction_queue, then we can check that
+ * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which case
+ * we know the lock owner is reading files created by this compaction or later.
+ * The advantage is that we don't have to store the locks.
+ */
+ Set<Long> currentToCleanSet = new HashSet<>();
+ for (CompactionInfo ci : toClean) {
+ currentToCleanSet.add(ci.id);
+ }
+ Set<Long> cleanPerformedByOthers = new HashSet<>();
+ for (long id : compactId2CompactInfoMap.keySet()) {
+ if (!currentToCleanSet.contains(id)) {
+ cleanPerformedByOthers.add(id);
+ }
+ }
+ for (long id : cleanPerformedByOthers) {
+ compactId2CompactInfoMap.remove(id);
+ compactId2LockMap.remove(id);
+ }
+ }
if (toClean.size() > 0 || compactId2LockMap.size() > 0) {
ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest());
@@ -119,6 +147,7 @@ public class Cleaner extends CompactorThread {
// Remember to remove this when we're out of the loop,
// we can't do it in the loop or we'll get a concurrent modification exception.
compactionsCleaned.add(queueEntry.getKey());
+ //Future thought: this may be expensive so consider having a thread pool run in parallel
clean(compactId2CompactInfoMap.get(queueEntry.getKey()));
} else {
// Remove the locks we didn't see so we don't look for them again next time
@@ -140,6 +169,11 @@ public class Cleaner extends CompactorThread {
LOG.error("Caught an exception in the main loop of compactor cleaner, " +
StringUtils.stringifyException(t));
}
+ finally {
+ if (handle != null) {
+ handle.releaseLocks();
+ }
+ }
if (setLooped) {
looped.set(true);
}
@@ -206,10 +240,24 @@ public class Cleaner extends CompactorThread {
StorageDescriptor sd = resolveStorageDescriptor(t, p);
final String location = sd.getLocation();
- // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open
- // transactions. This assures that all deltas are treated as valid and all we return are
- // obsolete files.
- final ValidTxnList txnList = new ValidReadTxnList();
+ /**
+ * Each Compaction only compacts as far as the highest txn id such that all txns below it
+ * are resolved (i.e. not opened). This is what "highestTxnId" tracks. This is only tracked
+ * since Hive 1.3.0/2.0 - thus may be 0. See ValidCompactorTxnList and uses for more info.
+ *
+ * We only want to clean up to the highestTxnId - otherwise we risk deleteing deltas from
+ * under an active reader.
+ *
+ * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a
+ * clean request for D2.
+ * Cleaner checks existing locks and finds none.
+ * Between that check and removeFiles() a query starts (it will be reading D3) and another compaction
+ * completes which creates D4.
+ * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete
+ * unless ValidTxnList is "capped" at highestTxnId.
+ */
+ final ValidTxnList txnList = ci.highestTxnId > 0 ?
+ new ValidReadTxnList(new long[0], ci.highestTxnId) : new ValidReadTxnList();
if (runJobAsSelf(ci.runAs)) {
removeFiles(location, txnList);
@@ -249,7 +297,7 @@ public class Cleaner extends CompactorThread {
FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
for (Path dead : filesToDelete) {
- LOG.debug("Doing to delete path " + dead.toString());
+ LOG.debug("Going to delete path " + dead.toString());
fs.delete(dead, true);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 1898a4d..0e4ba06 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -74,11 +74,15 @@ public class Initiator extends CompactorThread {
// much easier. The stop value is only for testing anyway and not used when called from
// HiveMetaStore.
do {
- long startedAt = System.currentTimeMillis();
+ long startedAt = -1;
+ TxnStore.MutexAPI.LockHandle handle = null;
// Wrap the inner parts of the loop in a catch throwable so that any errors in the loop
// don't doom the entire thread.
- try {//todo: add method to only get current i.e. skip history - more efficient
+ try {
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name());
+ startedAt = System.currentTimeMillis();
+ //todo: add method to only get current i.e. skip history - more efficient
ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
ValidTxnList txns =
TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
@@ -114,6 +118,8 @@ public class Initiator extends CompactorThread {
// Check if we already have initiated or are working on a compaction for this partition
// or table. If so, skip it. If we are just waiting on cleaning we can still check,
// as it may be time to compact again even though we haven't cleaned.
+ //todo: this is not robust. You can easily run Alter Table to start a compaction between
+ //the time currentCompactions is generated and now
if (lookForCurrentCompactions(currentCompactions, ci)) {
LOG.debug("Found currently initiated or working compaction for " +
ci.getFullPartitionName() + " so we will not initiate another compaction");
@@ -134,7 +140,9 @@ public class Initiator extends CompactorThread {
}
StorageDescriptor sd = resolveStorageDescriptor(t, p);
String runAs = findUserToRunAs(sd.getLocation(), t);
-
+ /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive.
+ * Long term we should consider having a thread pool here and running checkForCompactionS
+ * in parallel*/
CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, runAs);
if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
} catch (Throwable t) {
@@ -154,6 +162,11 @@ public class Initiator extends CompactorThread {
LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
StringUtils.stringifyException(t));
}
+ finally {
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ }
long elapsedTime = System.currentTimeMillis() - startedAt;
if (elapsedTime >= checkInterval || stop.get()) continue;
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 6f8dc35..8394ec6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -229,7 +229,6 @@ public class TestTxnCommands2 {
}
Assert.assertFalse("PPD '" + ppd + "' wasn't pushed", true);
}
- @Ignore("alter table")
@Test
public void testAlterTable() throws Exception {
int[][] tableData = {{1,2}};
@@ -604,7 +603,13 @@ public class TestTxnCommands2 {
private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
int lastCount = houseKeeperService.getIsAliveCounter();
houseKeeperService.start(conf);
+ int maxIter = 10;
+ int iterCount = 0;
while(houseKeeperService.getIsAliveCounter() <= lastCount) {
+ if(iterCount++ >= maxIter) {
+ //prevent test hangs
+ throw new IllegalStateException("HouseKeeper didn't run after " + iterCount + " waits");
+ }
try {
Thread.sleep(100);//make sure it has run at least once
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 99705b4..b355dbe 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -194,7 +194,13 @@ public class TestDbTxnManager {
private void runReaper() throws Exception {
int lastCount = houseKeeperService.getIsAliveCounter();
houseKeeperService.start(conf);
+ int maxIter = 10;
+ int iterCount = 0;
while(houseKeeperService.getIsAliveCounter() <= lastCount) {
+ if(iterCount++ >= maxIter) {
+ //prevent test hangs
+ throw new IllegalStateException("Reaper didn't run after " + iterCount + " waits");
+ }
try {
Thread.sleep(100);//make sure it has run at least once
}
[2/3] hive git commit: HIVE-13344 - port HIVE-12902 to 1.x line
(Eugene Koifman, reviewed by Wei Zheng)
Posted by ek...@apache.org.
HIVE-13344 - port HIVE-12902 to 1.x line (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c8295051
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c8295051
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c8295051
Branch: refs/heads/branch-1
Commit: c8295051cc26577dcc1eb17709d4ffc0f9784c5b
Parents: db2efe4
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Mar 24 16:21:07 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Mar 24 16:21:07 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +-
.../hive/ql/txn/compactor/TestCompactor.java | 25 +-
.../hive/metastore/AcidEventListener.java | 38 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 14 +-
.../hive/metastore/HiveMetaStoreClient.java | 6 +-
.../metastore/txn/CompactionTxnHandler.java | 47 +--
.../hadoop/hive/metastore/txn/TxnHandler.java | 127 +------
.../hadoop/hive/metastore/txn/TxnStore.java | 364 +++++++++++++++++++
.../hadoop/hive/metastore/txn/TxnUtils.java | 209 +++++++++++
.../metastore/txn/TestCompactionTxnHandler.java | 5 +-
.../hive/metastore/txn/TestTxnHandler.java | 183 +++++-----
.../metastore/txn/TestTxnHandlerNegative.java | 2 +-
.../ql/txn/AcidCompactionHistoryService.java | 16 +-
.../hive/ql/txn/AcidHouseKeeperService.java | 13 +-
.../hive/ql/txn/compactor/CompactorThread.java | 7 +-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 8 +-
.../hadoop/hive/ql/txn/compactor/Worker.java | 6 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 +-
.../hive/ql/lockmgr/TestDbTxnManager.java | 12 +-
.../hive/ql/txn/compactor/CompactorTest.java | 34 +-
.../hive/ql/txn/compactor/TestCleaner.java | 20 +-
.../hive/ql/txn/compactor/TestInitiator.java | 7 +-
.../hive/ql/txn/compactor/TestWorker.java | 13 +-
23 files changed, 847 insertions(+), 337 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b78bea2..f84c940 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -39,6 +39,7 @@ import java.util.regex.Pattern;
import javax.security.auth.login.LoginException;
+import com.google.common.base.Joiner;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -56,7 +57,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hive.common.HiveCompat;
-import com.google.common.base.Joiner;
/**
* Hive Configuration.
@@ -607,6 +607,10 @@ public class HiveConf extends Configuration {
METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore",
"Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. \n" +
"This class is used to store and retrieval of raw metadata objects such as table, database"),
+ METASTORE_TXN_STORE_IMPL("hive.metastore.txn.store.impl",
+ "org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler",
+ "Name of class that implements org.apache.hadoop.hive.metastore.txn.TxnStore. This " +
+ "class is used to store and retrieve transactions and locks"),
METASTORE_CONNECTION_DRIVER("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver",
"Driver class name for a JDBC metastore"),
METASTORE_MANAGER_FACTORY_CLASS("javax.jdo.PersistenceManagerFactoryClass",
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 9c0f374..37bbab8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -8,7 +8,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -20,10 +19,10 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
@@ -187,7 +186,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(4, compacts.size());
@@ -290,7 +289,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(4, compacts.size());
@@ -363,7 +362,7 @@ public class TestCompactor {
execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
tblName + " after load:");
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
CompactionInfo ci = new CompactionInfo("default", tblName, "bkt=0", CompactionType.MAJOR);
LOG.debug("List of stats columns before analyze Part1: " + txnHandler.findColumnsWithStats(ci));
Worker.StatsUpdater su = Worker.StatsUpdater.init(ci, colNames, conf,
@@ -498,7 +497,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(2, compacts.size());
@@ -538,7 +537,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(2, compacts.size());
@@ -580,7 +579,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
@@ -620,7 +619,7 @@ public class TestCompactor {
writeBatch(connection, writer, true);
// Now, compact
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
@@ -682,7 +681,7 @@ public class TestCompactor {
writeBatch(connection, writer, true);
// Now, compact
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
@@ -738,7 +737,7 @@ public class TestCompactor {
txnBatch.abort();
// Now, compact
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
@@ -804,7 +803,7 @@ public class TestCompactor {
// Now, compact
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
index 767bc54..b241e9e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
@@ -25,7 +25,8 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
/**
@@ -33,7 +34,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnHandler;
*/
public class AcidEventListener extends MetaStoreEventListener {
- private TxnHandler txnHandler;
+ private TxnStore txnHandler;
private HiveConf hiveConf;
public AcidEventListener(Configuration configuration) {
@@ -46,24 +47,47 @@ public class AcidEventListener extends MetaStoreEventListener {
// We can loop thru all the tables to check if they are ACID first and then perform cleanup,
// but it's more efficient to unconditionally perform cleanup for the database, especially
// when there are a lot of tables
- txnHandler = new TxnHandler(hiveConf);
+ txnHandler = getTxnHandler();
txnHandler.cleanupRecords(HiveObjectType.DATABASE, dbEvent.getDatabase(), null, null);
}
@Override
public void onDropTable(DropTableEvent tableEvent) throws MetaException {
- if (TxnHandler.isAcidTable(tableEvent.getTable())) {
- txnHandler = new TxnHandler(hiveConf);
+ if (TxnUtils.isAcidTable(tableEvent.getTable())) {
+ txnHandler = getTxnHandler();
txnHandler.cleanupRecords(HiveObjectType.TABLE, null, tableEvent.getTable(), null);
}
}
@Override
public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
- if (TxnHandler.isAcidTable(partitionEvent.getTable())) {
- txnHandler = new TxnHandler(hiveConf);
+ if (TxnUtils.isAcidTable(partitionEvent.getTable())) {
+ txnHandler = getTxnHandler();
txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, partitionEvent.getTable(),
partitionEvent.getPartitionIterator());
}
}
+ private TxnStore getTxnHandler() {
+ boolean hackOn = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST) ||
+ HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST);
+ String origTxnMgr = null;
+ boolean origConcurrency = false;
+
+ // Since TxnUtils.getTxnStore calls TxnHandler.setConf -> checkQFileTestHack -> TxnDbUtil.setConfValues,
+ // which may change the values of below two entries, we need to avoid pulluting the original values
+ if (hackOn) {
+ origTxnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER);
+ origConcurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+ }
+
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
+
+ // Set them back
+ if (hackOn) {
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, origTxnMgr);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, origConcurrency);
+ }
+
+ return txnHandler;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index fba545d..bf65532 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
-
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -176,7 +175,8 @@ import org.apache.hadoop.hive.metastore.model.MRoleMap;
import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege;
import org.apache.hadoop.hive.metastore.model.MTablePrivilege;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.shims.HadoopShims;
@@ -308,9 +308,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
};
- private final ThreadLocal<TxnHandler> threadLocalTxn = new ThreadLocal<TxnHandler>() {
+ private static final ThreadLocal<TxnStore> threadLocalTxn = new ThreadLocal<TxnStore>() {
@Override
- protected synchronized TxnHandler initialValue() {
+ protected TxnStore initialValue() {
return null;
}
};
@@ -584,10 +584,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
return ms;
}
- private TxnHandler getTxnHandler() {
- TxnHandler txn = threadLocalTxn.get();
+ private TxnStore getTxnHandler() {
+ TxnStore txn = threadLocalTxn.get();
if (txn == null) {
- txn = new TxnHandler(hiveConf);
+ txn = TxnUtils.getTxnStore(hiveConf);
threadLocalTxn.set(txn);
}
return txn;
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 393ef3b..50bf43c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -134,7 +134,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
@@ -1846,12 +1846,12 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
@Override
public ValidTxnList getValidTxns() throws TException {
- return TxnHandler.createValidReadTxnList(client.get_open_txns(), 0);
+ return TxnUtils.createValidReadTxnList(client.get_open_txns(), 0);
}
@Override
public ValidTxnList getValidTxns(long currentTxn) throws TException {
- return TxnHandler.createValidReadTxnList(client.get_open_txns(), currentTxn);
+ return TxnUtils.createValidReadTxnList(client.get_open_txns(), currentTxn);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 28e06ed..f7c738a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -20,27 +20,33 @@ package org.apache.hadoop.hive.metastore.txn;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.util.StringUtils;
-import java.sql.*;
-import java.util.*;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
/**
* Extends the transaction handler with methods needed only by the compactor threads. These
* methods are not available through the thrift interface.
*/
-public class CompactionTxnHandler extends TxnHandler {
+class CompactionTxnHandler extends TxnHandler {
static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
// Always access COMPACTION_QUEUE before COMPLETED_TXN_COMPONENTS
// See TxnHandler for notes on how to deal with deadlocks. Follow those notes.
- public CompactionTxnHandler(HiveConf conf) {
- super(conf);
+ public CompactionTxnHandler() {
}
/**
@@ -385,7 +391,7 @@ public class CompactionTxnHandler extends TxnHandler {
}
// Populate the complete query with provided prefix and suffix
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false);
for (String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
@@ -450,7 +456,7 @@ public class CompactionTxnHandler extends TxnHandler {
prefix.append("delete from TXNS where ");
suffix.append("");
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false);
for (String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
@@ -620,27 +626,6 @@ public class CompactionTxnHandler extends TxnHandler {
}
/**
- * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
- * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
- * compact the files, and thus treats only open transactions as invalid. Additionally any
- * txnId > highestOpenTxnId is also invalid. This is avoid creating something like
- * delta_17_120 where txnId 80, for example, is still open.
- * @param txns txn list from the metastore
- * @return a valid txn list.
- */
- public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
- long highWater = txns.getTxn_high_water_mark();
- long minOpenTxn = Long.MAX_VALUE;
- long[] exceptions = new long[txns.getOpen_txnsSize()];
- int i = 0;
- for (TxnInfo txn : txns.getOpen_txns()) {
- if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
- exceptions[i++] = txn.getId();
- }
- highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
- return new ValidCompactorTxnList(exceptions, -1, highWater);
- }
- /**
* Record the highest txn id that the {@code ci} compaction job will pay attention to.
*/
public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException {
@@ -746,7 +731,7 @@ public class CompactionTxnHandler extends TxnHandler {
prefix.append("delete from COMPLETED_COMPACTIONS where ");
suffix.append("");
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false);
for (String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 0ddc078..9789371 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -87,14 +87,7 @@ import java.util.concurrent.locks.ReentrantLock;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class TxnHandler {
- // Compactor states (Should really be enum)
- static final public String INITIATED_RESPONSE = "initiated";
- static final public String WORKING_RESPONSE = "working";
- static final public String CLEANING_RESPONSE = "ready for cleaning";
- static final public String FAILED_RESPONSE = "failed";
- static final public String SUCCEEDED_RESPONSE = "succeeded";
- static final public String ATTEMPTED_RESPONSE = "attempted";
+abstract class TxnHandler implements TxnStore {
static final protected char INITIATED_STATE = 'i';
static final protected char WORKING_STATE = 'w';
@@ -131,7 +124,7 @@ public class TxnHandler {
* Number of consecutive deadlocks we have seen
*/
private int deadlockCnt;
- private final long deadlockRetryInterval;
+ private long deadlockRetryInterval;
protected HiveConf conf;
protected DatabaseProduct dbProduct;
@@ -139,8 +132,8 @@ public class TxnHandler {
private long timeout;
private String identifierQuoteString; // quotes to use for quoting tables, where necessary
- private final long retryInterval;
- private final int retryLimit;
+ private long retryInterval;
+ private int retryLimit;
private int retryNum;
/**
* Derby specific concurrency control
@@ -157,7 +150,10 @@ public class TxnHandler {
// in mind. To do this they should call checkRetryable() AFTER rolling back the db transaction,
// and then they should catch RetryException and call themselves recursively. See commitTxn for an example.
- public TxnHandler(HiveConf conf) {
+ public TxnHandler() {
+ }
+
+ public void setConf(HiveConf conf) {
this.conf = conf;
checkQFileTestHack();
@@ -183,7 +179,6 @@ public class TxnHandler {
TimeUnit.MILLISECONDS);
retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
deadlockRetryInterval = retryInterval / 10;
-
}
public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
@@ -1211,6 +1206,7 @@ public class TxnHandler {
* Clean up corresponding records in metastore tables, specifically:
* TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS
*/
+ @Override
public void cleanupRecords(HiveObjectType type, Database db, Table table,
Iterator<Partition> partitionIterator) throws MetaException {
try {
@@ -1386,106 +1382,11 @@ public class TxnHandler {
String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
}
-
- /**
- * Build a query (or queries if one query is too big) with specified "prefix" and "suffix",
- * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) OR id in (4,5,6)
- * For NOT IN case, NOT IN list is broken into multiple AND clauses.
- * @param queries array of complete query strings
- * @param prefix part of the query that comes before IN list
- * @param suffix part of the query that comes after IN list
- * @param inList the list containing IN list values
- * @param inColumn column name of IN list operator
- * @param addParens add a pair of parenthesis outside the IN lists
- * e.g. ( id in (1,2,3) OR id in (4,5,6) )
- * @param notIn clause to be broken up is NOT IN
- */
- public static void buildQueryWithINClause(HiveConf conf, List<String> queries, StringBuilder prefix,
- StringBuilder suffix, List<Long> inList,
- String inColumn, boolean addParens, boolean notIn) {
- int batchSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
- int numWholeBatches = inList.size() / batchSize;
- StringBuilder buf = new StringBuilder();
- buf.append(prefix);
- if (addParens) {
- buf.append("(");
- }
- buf.append(inColumn);
- if (notIn) {
- buf.append(" not in (");
- } else {
- buf.append(" in (");
- }
-
- for (int i = 0; i <= numWholeBatches; i++) {
- if (needNewQuery(conf, buf)) {
- // Wrap up current query string
- if (addParens) {
- buf.append(")");
- }
- buf.append(suffix);
- queries.add(buf.toString());
-
- // Prepare a new query string
- buf.setLength(0);
- }
-
- if (i > 0) {
- if (notIn) {
- if (buf.length() == 0) {
- buf.append(prefix);
- if (addParens) {
- buf.append("(");
- }
- } else {
- buf.append(" and ");
- }
- buf.append(inColumn);
- buf.append(" not in (");
- } else {
- if (buf.length() == 0) {
- buf.append(prefix);
- if (addParens) {
- buf.append("(");
- }
- } else {
- buf.append(" or ");
- }
- buf.append(inColumn);
- buf.append(" in (");
- }
- }
-
- if (i * batchSize == inList.size()) {
- // At this point we just realized we don't need another query
- return;
- }
- for (int j = i * batchSize; j < (i + 1) * batchSize && j < inList.size(); j++) {
- buf.append(inList.get(j)).append(",");
- }
- buf.setCharAt(buf.length() - 1, ')');
- }
-
- if (addParens) {
- buf.append(")");
- }
- buf.append(suffix);
- queries.add(buf.toString());
- }
-
- /** Estimate if the size of a string will exceed certain limit */
- private static boolean needNewQuery(HiveConf conf, StringBuilder sb) {
- int queryMemoryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH);
- // http://www.javamex.com/tutorials/memory/string_memory_usage.shtml
- long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8);
- return sizeInBytes / 1024 > queryMemoryLimit;
- }
-
/**
* For testing only, do not use.
*/
@VisibleForTesting
- int numLocksInLockTable() throws SQLException, MetaException {
+ public int numLocksInLockTable() throws SQLException, MetaException {
Connection dbConn = null;
Statement stmt = null;
ResultSet rs = null;
@@ -1508,7 +1409,7 @@ public class TxnHandler {
/**
* For testing only, do not use.
*/
- long setTimeout(long milliseconds) {
+ public long setTimeout(long milliseconds) {
long previous_timeout = timeout;
timeout = milliseconds;
return previous_timeout;
@@ -1975,7 +1876,7 @@ public class TxnHandler {
suffix.append("");
}
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", true, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", true, false);
for (String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
@@ -1998,7 +1899,7 @@ public class TxnHandler {
prefix.append("delete from HIVE_LOCKS where ");
suffix.append("");
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "hl_txnid", false, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "hl_txnid", false, false);
for (String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
@@ -2435,7 +2336,7 @@ public class TxnHandler {
prefix.append(" and hl_txnid = 0 and ");
suffix.append("");
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "hl_lock_ext_id", true, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "hl_lock_ext_id", true, false);
int deletedLocks = 0;
for (String query : queries) {
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
new file mode 100644
index 0000000..6fc6ed9
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A handler to answer transaction related calls that come into the metastore
+ * server.
+ *
+ * Note on log messages: Please include txnid:X and lockid info using
+ * {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)}
+ * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages.
+ * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated,
+ * so keeping the format consistent makes grep'ing the logs much easier.
+ *
+ * Note on HIVE_LOCKS.hl_last_heartbeat.
+ * For locks that are part of transaction, we set this 0 (would rather set it to NULL but
+ * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding
+ * transaction in TXNS.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface TxnStore {
+
+ // Compactor states (Should really be enum)
+ static final public String INITIATED_RESPONSE = "initiated";
+ static final public String WORKING_RESPONSE = "working";
+ static final public String CLEANING_RESPONSE = "ready for cleaning";
+ static final public String FAILED_RESPONSE = "failed";
+ static final public String SUCCEEDED_RESPONSE = "succeeded";
+ static final public String ATTEMPTED_RESPONSE = "attempted";
+
+ public static final int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 1000;
+
+ public void setConf(HiveConf conf);
+
+ /**
+ * Get information about open transactions. This gives extensive information about the
+ * transactions rather than just the list of transactions. This should be used when the need
+ * is to see information about the transactions (e.g. show transactions).
+ * @return information about open transactions
+ * @throws MetaException
+ */
+ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException;
+
+ /**
+ * Get list of valid transactions. This gives just the list of transactions that are open.
+ * @return list of open transactions, as well as a high water mark.
+ * @throws MetaException
+ */
+ public GetOpenTxnsResponse getOpenTxns() throws MetaException;
+
+ /**
+ * Open a set of transactions
+ * @param rqst request to open transactions
+ * @return information on opened transactions
+ * @throws MetaException
+ */
+ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException;
+
+ /**
+ * Abort (rollback) a transaction.
+ * @param rqst info on transaction to abort
+ * @throws NoSuchTxnException
+ * @throws MetaException
+ */
+ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException;
+
+ /**
+ * Commit a transaction
+ * @param rqst info on transaction to commit
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public void commitTxn(CommitTxnRequest rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+ /**
+ * Obtain a lock.
+ * @param rqst information on the lock to obtain. If the requester is part of a transaction
+ * the txn information must be included in the lock request.
+ * @return info on the lock, including whether it was obtained.
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public LockResponse lock(LockRequest rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+ /**
+ * Check whether a lock has been obtained. This is used after {@link #lock} returned a wait
+ * state.
+ * @param rqst info on the lock to check
+ * @return info on the state of the lock
+ * @throws NoSuchTxnException
+ * @throws NoSuchLockException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public LockResponse checkLock(CheckLockRequest rqst)
+ throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
+
+ /**
+ * Unlock a lock. It is not legal to call this if the caller is part of a txn. In that case
+ * the txn should be committed or aborted instead. (Note someday this will change since
+ * multi-statement transactions will allow unlocking in the transaction.)
+ * @param rqst lock to unlock
+ * @throws NoSuchLockException
+ * @throws TxnOpenException
+ * @throws MetaException
+ */
+ public void unlock(UnlockRequest rqst)
+ throws NoSuchLockException, TxnOpenException, MetaException;
+
+ /**
+ * Get information on current locks.
+ * @param rqst lock information to retrieve
+ * @return lock information.
+ * @throws MetaException
+ */
+ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException;
+
+ /**
+ * Send a heartbeat for a lock or a transaction
+ * @param ids lock and/or txn id to heartbeat
+ * @throws NoSuchTxnException
+ * @throws NoSuchLockException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public void heartbeat(HeartbeatRequest ids)
+ throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
+
+ /**
+ * Heartbeat a group of transactions together
+ * @param rqst set of transactions to heartbat
+ * @return info on txns that were heartbeated
+ * @throws MetaException
+ */
+ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
+ throws MetaException;
+
+ /**
+ * Submit a compaction request into the queue. This is called when a user manually requests a
+ * compaction.
+ * @param rqst information on what to compact
+ * @return id of the compaction that has been started
+ * @throws MetaException
+ */
+ public long compact(CompactionRequest rqst) throws MetaException;
+
+ /**
+ * Show list of current compactions
+ * @param rqst info on which compactions to show
+ * @return compaction information
+ * @throws MetaException
+ */
+ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException;
+
+ /**
+ * Add information on a set of dynamic partitions that participated in a transaction.
+ * @param rqst dynamic partition info.
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public void addDynamicPartitions(AddDynamicPartitions rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+ /**
+ * Clean up corresponding records in metastore tables
+ * @param type Hive object type
+ * @param db database object
+ * @param table table object
+ * @param partitionIterator partition iterator
+ * @throws MetaException
+ */
+ public void cleanupRecords(HiveObjectType type, Database db, Table table,
+ Iterator<Partition> partitionIterator) throws MetaException;
+ /**
+ * Timeout transactions and/or locks. This should only be called by the compactor.
+ */
+ public void performTimeOuts();
+
+ /**
+ * This will look through the completed_txn_components table and look for partitions or tables
+ * that may be ready for compaction. Also, look through txns and txn_components tables for
+ * aborted transactions that we should add to the list.
+ * @param maxAborted Maximum number of aborted queries to allow before marking this as a
+ * potential compaction.
+ * @return list of CompactionInfo structs. These will not have id, type,
+ * or runAs set since these are only potential compactions not actual ones.
+ */
+ public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException;
+
+ /**
+ * Sets the user to run as. This is for the case
+ * where the request was generated by the user and so the worker must set this value later.
+ * @param cq_id id of this entry in the queue
+ * @param user user to run the jobs as
+ */
+ public void setRunAs(long cq_id, String user) throws MetaException;
+
+ /**
+ * This will grab the next compaction request off of
+ * the queue, and assign it to the worker.
+ * @param workerId id of the worker calling this, will be recorded in the db
+ * @return an info element for this compaction request, or null if there is no work to do now.
+ */
+ public CompactionInfo findNextToCompact(String workerId) throws MetaException;
+
+ /**
+ * This will mark an entry in the queue as compacted
+ * and put it in the ready to clean state.
+ * @param info info on the compaction entry to mark as compacted.
+ */
+ public void markCompacted(CompactionInfo info) throws MetaException;
+
+ /**
+ * Find entries in the queue that are ready to
+ * be cleaned.
+ * @return information on the entry in the queue.
+ */
+ public List<CompactionInfo> findReadyToClean() throws MetaException;
+
+ /**
+ * This will remove an entry from the queue after
+ * it has been compacted.
+ *
+ * @param info info on the compaction entry to remove
+ */
+ public void markCleaned(CompactionInfo info) throws MetaException;
+
+ /**
+ * Mark a compaction entry as failed. This will move it to the compaction history queue with a
+ * failed status. It will NOT clean up aborted transactions in the table/partition associated
+ * with this compaction.
+ * @param info information on the compaction that failed.
+ * @throws MetaException
+ */
+ public void markFailed(CompactionInfo info) throws MetaException;
+
+ /**
+ * Clean up aborted transactions from txns that have no components in txn_components. The reson such
+ * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
+ * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
+ */
+ public void cleanEmptyAbortedTxns() throws MetaException;
+
+ /**
+ * This will take all entries assigned to workers
+ * on a host return them to INITIATED state. The initiator should use this at start up to
+ * clean entries from any workers that were in the middle of compacting when the metastore
+ * shutdown. It does not reset entries from worker threads on other hosts as those may still
+ * be working.
+ * @param hostname Name of this host. It is assumed this prefixes the thread's worker id,
+ * so that like hostname% will match the worker id.
+ */
+ public void revokeFromLocalWorkers(String hostname) throws MetaException;
+
+ /**
+ * This call will return all compaction queue
+ * entries assigned to a worker but over the timeout back to the initiated state.
+ * This should be called by the initiator on start up and occasionally when running to clean up
+ * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called
+ * first.
+ * @param timeout number of milliseconds since start time that should elapse before a worker is
+ * declared dead.
+ */
+ public void revokeTimedoutWorkers(long timeout) throws MetaException;
+
+ /**
+ * Queries metastore DB directly to find columns in the table which have statistics information.
+ * If {@code ci} includes partition info then per partition stats info is examined, otherwise
+ * table level stats are examined.
+ * @throws MetaException
+ */
+ public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException;
+
+ /**
+ * Record the highest txn id that the {@code ci} compaction job will pay attention to.
+ */
+ public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException;
+
+ /**
+ * For any given compactable entity (partition, table if not partitioned) the history of compactions
+ * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the
+ * history such that a configurable number of each type of state is present. Any other entries
+ * can be purged. This scheme has advantage of always retaining the last failure/success even if
+ * it's not recent.
+ * @throws MetaException
+ */
+ public void purgeCompactionHistory() throws MetaException;
+
+ /**
+ * Determine if there are enough consecutive failures compacting a table or partition that no
+ * new automatic compactions should be scheduled. User initiated compactions do not do this
+ * check.
+ * @param ci Table or partition to check.
+ * @return true if it is ok to compact, false if there have been too many failures.
+ * @throws MetaException
+ */
+ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;
+
+
+ @VisibleForTesting
+ public int numLocksInLockTable() throws SQLException, MetaException;
+
+ @VisibleForTesting
+ long setTimeout(long milliseconds);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
new file mode 100644
index 0000000..f60e34b
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TxnUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
+
+ /**
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a
+ * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
+ * read the files, and thus treats both open and aborted transactions as invalid.
+ * @param txns txn list from the metastore
+ * @param currentTxn Current transaction that the user has open. If this is greater than 0 it
+ * will be removed from the exceptions list so that the user sees his own
+ * transaction as valid.
+ * @return a valid txn list.
+ */
+ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
+ long highWater = txns.getTxn_high_water_mark();
+ Set<Long> open = txns.getOpen_txns();
+ long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
+ int i = 0;
+ for(long txn: open) {
+ if (currentTxn > 0 && currentTxn == txn) continue;
+ exceptions[i++] = txn;
+ }
+ return new ValidReadTxnList(exceptions, highWater);
+ }
+
+ /**
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
+ * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
+ * compact the files, and thus treats only open transactions as invalid. Additionally any
+ * txnId > highestOpenTxnId is also invalid. This is avoid creating something like
+ * delta_17_120 where txnId 80, for example, is still open.
+ * @param txns txn list from the metastore
+ * @return a valid txn list.
+ */
+ public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
+ long highWater = txns.getTxn_high_water_mark();
+ long minOpenTxn = Long.MAX_VALUE;
+ long[] exceptions = new long[txns.getOpen_txnsSize()];
+ int i = 0;
+ for (TxnInfo txn : txns.getOpen_txns()) {
+ if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
+ exceptions[i++] = txn.getId();
+ }
+ highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
+ return new ValidCompactorTxnList(exceptions, -1, highWater);
+ }
+
+ /**
+ * Get an instance of the TxnStore that is appropriate for this store
+ * @param conf configuration
+ * @return txn store
+ */
+ public static TxnStore getTxnStore(HiveConf conf) {
+ String className = conf.getVar(HiveConf.ConfVars.METASTORE_TXN_STORE_IMPL);
+ try {
+ TxnStore handler = ((Class<? extends TxnHandler>) MetaStoreUtils.getClass(
+ className)).newInstance();
+ handler.setConf(conf);
+ return handler;
+ } catch (Exception e) {
+ LOG.error("Unable to instantiate raw store directly in fastpath mode", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Checks if a table is a valid ACID table.
+ * Note, users are responsible for using the correct TxnManager. We do not look at
+ * SessionState.get().getTxnMgr().supportsAcid() here
+ * @param table table
+ * @return true if table is a legit ACID table, false otherwise
+ */
+ public static boolean isAcidTable(Table table) {
+ if (table == null) {
+ return false;
+ }
+ Map<String, String> parameters = table.getParameters();
+ String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
+ }
+ /**
+ * Build a query (or queries if one query is too big) with specified "prefix" and "suffix",
+ * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) OR id in (4,5,6)
+ * For NOT IN case, NOT IN list is broken into multiple AND clauses.
+ * @param queries array of complete query strings
+ * @param prefix part of the query that comes before IN list
+ * @param suffix part of the query that comes after IN list
+ * @param inList the list containing IN list values
+ * @param inColumn column name of IN list operator
+ * @param addParens add a pair of parenthesis outside the IN lists
+ * e.g. ( id in (1,2,3) OR id in (4,5,6) )
+ * @param notIn clause to be broken up is NOT IN
+ */
+ public static void buildQueryWithINClause(HiveConf conf, List<String> queries, StringBuilder prefix,
+ StringBuilder suffix, List<Long> inList,
+ String inColumn, boolean addParens, boolean notIn) {
+ int batchSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
+ int numWholeBatches = inList.size() / batchSize;
+ StringBuilder buf = new StringBuilder();
+ buf.append(prefix);
+ if (addParens) {
+ buf.append("(");
+ }
+ buf.append(inColumn);
+ if (notIn) {
+ buf.append(" not in (");
+ } else {
+ buf.append(" in (");
+ }
+
+ for (int i = 0; i <= numWholeBatches; i++) {
+ if (needNewQuery(conf, buf)) {
+ // Wrap up current query string
+ if (addParens) {
+ buf.append(")");
+ }
+ buf.append(suffix);
+ queries.add(buf.toString());
+
+ // Prepare a new query string
+ buf.setLength(0);
+ }
+
+ if (i > 0) {
+ if (notIn) {
+ if (buf.length() == 0) {
+ buf.append(prefix);
+ if (addParens) {
+ buf.append("(");
+ }
+ } else {
+ buf.append(" and ");
+ }
+ buf.append(inColumn);
+ buf.append(" not in (");
+ } else {
+ if (buf.length() == 0) {
+ buf.append(prefix);
+ if (addParens) {
+ buf.append("(");
+ }
+ } else {
+ buf.append(" or ");
+ }
+ buf.append(inColumn);
+ buf.append(" in (");
+ }
+ }
+
+ if (i * batchSize == inList.size()) {
+ // At this point we just realized we don't need another query
+ return;
+ }
+ for (int j = i * batchSize; j < (i + 1) * batchSize && j < inList.size(); j++) {
+ buf.append(inList.get(j)).append(",");
+ }
+ buf.setCharAt(buf.length() - 1, ')');
+ }
+
+ if (addParens) {
+ buf.append(")");
+ }
+ buf.append(suffix);
+ queries.add(buf.toString());
+ }
+
+ /** Estimate if the size of a string will exceed certain limit */
+ private static boolean needNewQuery(HiveConf conf, StringBuilder sb) {
+ int queryMemoryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH);
+ // http://www.javamex.com/tutorials/memory/string_memory_usage.shtml
+ long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8);
+ return sizeInBytes / 1024 > queryMemoryLimit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 051da60..bdeacb9 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -42,8 +42,7 @@ import static junit.framework.Assert.*;
public class TestCompactionTxnHandler {
private HiveConf conf = new HiveConf();
- private CompactionTxnHandler txnHandler;
- static final private Log LOG = LogFactory.getLog(TestCompactionTxnHandler.class);
+ private TxnStore txnHandler;
public TestCompactionTxnHandler() throws Exception {
TxnDbUtil.setConfValues(conf);
@@ -424,7 +423,7 @@ public class TestCompactionTxnHandler {
@Before
public void setUp() throws Exception {
TxnDbUtil.prepDb();
- txnHandler = new CompactionTxnHandler(conf);
+ txnHandler = TxnUtils.getTxnStore(conf);
}
@After
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 930af7c..b8cab71 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -34,7 +34,11 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import static junit.framework.Assert.*;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
/**
* Tests for TxnHandler.
@@ -44,7 +48,7 @@ public class TestTxnHandler {
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
private HiveConf conf = new HiveConf();
- private TxnHandler txnHandler;
+ private TxnStore txnHandler;
public TestTxnHandler() throws Exception {
TxnDbUtil.setConfValues(conf);
@@ -1111,99 +1115,102 @@ public class TestTxnHandler {
@Ignore
public void deadlockDetected() throws Exception {
LOG.debug("Starting deadlock test");
- Connection conn = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- Statement stmt = conn.createStatement();
- long now = txnHandler.getDbTime(conn);
- stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
- "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
- "'scooby.com')");
- stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
- "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
- "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
- txnHandler.LOCK_WAITING + "', '" + txnHandler.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
- "'scooby.com')");
- conn.commit();
- txnHandler.closeDbConn(conn);
-
- final AtomicBoolean sawDeadlock = new AtomicBoolean();
-
- final Connection conn1 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- final Connection conn2 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- try {
+ if (txnHandler instanceof TxnHandler) {
+ final TxnHandler tHndlr = (TxnHandler)txnHandler;
+ Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Statement stmt = conn.createStatement();
+ long now = tHndlr.getDbTime(conn);
+ stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
+ "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
+ "'scooby.com')");
+ stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
+ "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
+ "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
+ tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
+ "'scooby.com')");
+ conn.commit();
+ tHndlr.closeDbConn(conn);
+
+ final AtomicBoolean sawDeadlock = new AtomicBoolean();
+
+ final Connection conn1 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ final Connection conn2 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ try {
- for (int i = 0; i < 5; i++) {
- Thread t1 = new Thread() {
- @Override
- public void run() {
- try {
+ for (int i = 0; i < 5; i++) {
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
try {
- updateTxns(conn1);
- updateLocks(conn1);
- Thread.sleep(1000);
- conn1.commit();
- LOG.debug("no exception, no deadlock");
- } catch (SQLException e) {
try {
- txnHandler.checkRetryable(conn1, e, "thread t1");
- LOG.debug("Got an exception, but not a deadlock, SQLState is " +
- e.getSQLState() + " class of exception is " + e.getClass().getName() +
- " msg is <" + e.getMessage() + ">");
- } catch (TxnHandler.RetryException de) {
- LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
- "exception is " + e.getClass().getName() + " msg is <" + e
- .getMessage() + ">");
- sawDeadlock.set(true);
+ updateTxns(conn1);
+ updateLocks(conn1);
+ Thread.sleep(1000);
+ conn1.commit();
+ LOG.debug("no exception, no deadlock");
+ } catch (SQLException e) {
+ try {
+ tHndlr.checkRetryable(conn1, e, "thread t1");
+ LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+ e.getSQLState() + " class of exception is " + e.getClass().getName() +
+ " msg is <" + e.getMessage() + ">");
+ } catch (TxnHandler.RetryException de) {
+ LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+ "exception is " + e.getClass().getName() + " msg is <" + e
+ .getMessage() + ">");
+ sawDeadlock.set(true);
+ }
}
+ conn1.rollback();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- conn1.rollback();
- } catch (Exception e) {
- throw new RuntimeException(e);
}
- }
- };
+ };
- Thread t2 = new Thread() {
- @Override
- public void run() {
- try {
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
try {
- updateLocks(conn2);
- updateTxns(conn2);
- Thread.sleep(1000);
- conn2.commit();
- LOG.debug("no exception, no deadlock");
- } catch (SQLException e) {
try {
- txnHandler.checkRetryable(conn2, e, "thread t2");
- LOG.debug("Got an exception, but not a deadlock, SQLState is " +
- e.getSQLState() + " class of exception is " + e.getClass().getName() +
- " msg is <" + e.getMessage() + ">");
- } catch (TxnHandler.RetryException de) {
- LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
- "exception is " + e.getClass().getName() + " msg is <" + e
- .getMessage() + ">");
- sawDeadlock.set(true);
+ updateLocks(conn2);
+ updateTxns(conn2);
+ Thread.sleep(1000);
+ conn2.commit();
+ LOG.debug("no exception, no deadlock");
+ } catch (SQLException e) {
+ try {
+ tHndlr.checkRetryable(conn2, e, "thread t2");
+ LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+ e.getSQLState() + " class of exception is " + e.getClass().getName() +
+ " msg is <" + e.getMessage() + ">");
+ } catch (TxnHandler.RetryException de) {
+ LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+ "exception is " + e.getClass().getName() + " msg is <" + e
+ .getMessage() + ">");
+ sawDeadlock.set(true);
+ }
}
+ conn2.rollback();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- conn2.rollback();
- } catch (Exception e) {
- throw new RuntimeException(e);
}
- }
- };
-
- t1.start();
- t2.start();
- t1.join();
- t2.join();
- if (sawDeadlock.get()) break;
+ };
+
+ t1.start();
+ t2.start();
+ t1.join();
+ t2.join();
+ if (sawDeadlock.get()) break;
+ }
+ assertTrue(sawDeadlock.get());
+ } finally {
+ conn1.rollback();
+ tHndlr.closeDbConn(conn1);
+ conn2.rollback();
+ tHndlr.closeDbConn(conn2);
}
- assertTrue(sawDeadlock.get());
- } finally {
- conn1.rollback();
- txnHandler.closeDbConn(conn1);
- conn2.rollback();
- txnHandler.closeDbConn(conn2);
}
}
@@ -1236,7 +1243,7 @@ public class TestTxnHandler {
for (long i = 1; i <= 200; i++) {
inList.add(i);
}
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
Assert.assertEquals(1, queries.size());
runAgainstDerby(queries);
@@ -1244,7 +1251,7 @@ public class TestTxnHandler {
// The first query has 2 full batches, and the second query only has 1 batch which only contains 1 member
queries.clear();
inList.add((long)201);
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
Assert.assertEquals(2, queries.size());
runAgainstDerby(queries);
@@ -1255,13 +1262,13 @@ public class TestTxnHandler {
for (long i = 202; i <= 4321; i++) {
inList.add(i);
}
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
Assert.assertEquals(3, queries.size());
runAgainstDerby(queries);
// Case 4 - NOT IN list
queries.clear();
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true);
Assert.assertEquals(3, queries.size());
runAgainstDerby(queries);
@@ -1269,7 +1276,7 @@ public class TestTxnHandler {
queries.clear();
suffix.setLength(0);
suffix.append("");
- TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false);
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false);
Assert.assertEquals(3, queries.size());
runAgainstDerby(queries);
}
@@ -1297,7 +1304,7 @@ public class TestTxnHandler {
@Before
public void setUp() throws Exception {
TxnDbUtil.prepDb();
- txnHandler = new TxnHandler(conf);
+ txnHandler = TxnUtils.getTxnStore(conf);
}
@After
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
index abe1e37..6c27515 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
@@ -37,7 +37,7 @@ public class TestTxnHandlerNegative {
conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "blah");
RuntimeException e = null;
try {
- TxnHandler txnHandler1 = new TxnHandler(conf);
+ TxnUtils.getTxnStore(conf);
}
catch(RuntimeException ex) {
LOG.info("Expected error: " + ex.getMessage(), ex);
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
index a91ca5c..59c8fe4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
@@ -18,20 +18,12 @@
package org.apache.hadoop.hive.ql.txn;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HouseKeeperService;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
-import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
-import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -60,10 +52,10 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
}
private static final class ObsoleteEntryReaper implements Runnable {
- private final CompactionTxnHandler txnHandler;
+ private final TxnStore txnHandler;
private final AtomicInteger isAliveCounter;
private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
- txnHandler = new CompactionTxnHandler(hiveConf);
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
this.isAliveCounter = isAliveCounter;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
index 38151fb..de74a7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
@@ -20,15 +20,10 @@ package org.apache.hadoop.hive.ql.txn;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HouseKeeperService;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
-import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -58,10 +53,10 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
}
private static final class TimedoutTxnReaper implements Runnable {
- private final TxnHandler txnHandler;
+ private final TxnStore txnHandler;
private final AtomicInteger isAliveCounter;
private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
- txnHandler = new TxnHandler(hiveConf);
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
this.isAliveCounter = isAliveCounter;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index c956f58..ae8865c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -50,7 +51,7 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
protected HiveConf conf;
- protected CompactionTxnHandler txnHandler;
+ protected TxnStore txnHandler;
protected RawStore rs;
protected int threadId;
protected AtomicBoolean stop;
@@ -75,7 +76,7 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
setDaemon(true); // this means the process will exit without waiting for this thread
// Get our own instance of the transaction handler
- txnHandler = new CompactionTxnHandler(conf);
+ txnHandler = TxnUtils.getTxnStore(conf);
// Get our own connection to the database so we can get table and partition information.
rs = RawStoreProxy.getProxy(conf, conf,
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index c023c27..1898a4d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -37,8 +37,8 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@@ -81,7 +81,7 @@ public class Initiator extends CompactorThread {
try {//todo: add method to only get current i.e. skip history - more efficient
ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
ValidTxnList txns =
- CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
+ TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
LOG.debug("Found " + potentials.size() + " potential compactions, " +
"checking to see if we should compact any of them");
@@ -184,7 +184,7 @@ public class Initiator extends CompactorThread {
CompactionInfo ci) {
if (compactions.getCompacts() != null) {
for (ShowCompactResponseElement e : compactions.getCompacts()) {
- if ((e.getState().equals(TxnHandler.WORKING_RESPONSE) || e.getState().equals(TxnHandler.INITIATED_RESPONSE)) &&
+ if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) &&
e.getDbname().equals(ci.dbname) &&
e.getTablename().equals(ci.tableName) &&
(e.getPartitionname() == null && ci.partName == null ||
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 59a765b..516b92e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -27,13 +27,15 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
@@ -135,7 +137,7 @@ public class Worker extends CompactorThread {
final boolean isMajor = ci.isMajorCompaction();
final ValidTxnList txns =
- CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
+ TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
LOG.debug("ValidCompactTxnList: " + txns.writeToString());
txnHandler.setCompactionHighestTxnId(ci, txns.getHighWatermark());
final StringBuilder jobName = new StringBuilder(name);
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 44b77e7..6f8dc35 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService;
@@ -486,7 +486,7 @@ public class TestTxnCommands2 {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
AtomicBoolean stop = new AtomicBoolean(true);
//create failed compactions
for(int i = 0; i < numFailedCompactions; i++) {
@@ -556,27 +556,27 @@ public class TestTxnCommands2 {
private int working;
private int total;
}
- private static CompactionsByState countCompacts(TxnHandler txnHandler) throws MetaException {
+ private static CompactionsByState countCompacts(TxnStore txnHandler) throws MetaException {
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
CompactionsByState compactionsByState = new CompactionsByState();
compactionsByState.total = resp.getCompactsSize();
for(ShowCompactResponseElement compact : resp.getCompacts()) {
- if(TxnHandler.FAILED_RESPONSE.equals(compact.getState())) {
+ if(TxnStore.FAILED_RESPONSE.equals(compact.getState())) {
compactionsByState.failed++;
}
- else if(TxnHandler.CLEANING_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.CLEANING_RESPONSE.equals(compact.getState())) {
compactionsByState.readyToClean++;
}
- else if(TxnHandler.INITIATED_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.INITIATED_RESPONSE.equals(compact.getState())) {
compactionsByState.initiated++;
}
- else if(TxnHandler.SUCCEEDED_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.SUCCEEDED_RESPONSE.equals(compact.getState())) {
compactionsByState.succeeded++;
}
- else if(TxnHandler.WORKING_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.WORKING_RESPONSE.equals(compact.getState())) {
compactionsByState.working++;
}
- else if(TxnHandler.ATTEMPTED_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) {
compactionsByState.attempted++;
}
}
@@ -632,7 +632,7 @@ public class TestTxnCommands2 {
runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
//run Worker to execute compaction
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index a4f7e5b..99705b4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
@@ -42,7 +42,11 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@@ -243,10 +247,10 @@ public class TestDbTxnManager {
}
expireLocks(txnMgr, 5);
//create a lot of locks
- for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
+ for(int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat
}
- expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
+ expireLocks(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
}
private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception {
DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager();