You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2016/01/29 20:47:44 UTC
[1/2] hive git commit: HIVE-12902 Refactor TxnHandler to be an
interface (gates reviewed by Eugene Koifman)
Repository: hive
Updated Branches:
refs/heads/master be9735e10 -> 1b2583ba8
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 899f5a1..a247065 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -17,15 +17,27 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
@@ -73,7 +85,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
// Check that the files are removed
List<Path> paths = getDirectories(conf, t, null);
@@ -105,7 +117,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
// Check that the files are removed
List<Path> paths = getDirectories(conf, t, p);
@@ -135,7 +147,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
// Check that the files are removed
List<Path> paths = getDirectories(conf, t, null);
@@ -174,7 +186,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
// Check that the files are removed
List<Path> paths = getDirectories(conf, t, p);
@@ -329,7 +341,7 @@ public class TestCleaner extends CompactorTest {
rsp = txnHandler.showCompact(new ShowCompactRequest());
compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
}
@Test
@@ -403,7 +415,7 @@ public class TestCleaner extends CompactorTest {
rsp = txnHandler.showCompact(new ShowCompactRequest());
compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
}
@Test
@@ -429,7 +441,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
// Check that the files are removed
List<Path> paths = getDirectories(conf, t, p);
@@ -460,7 +472,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
}
@Test
@@ -488,7 +500,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
}
@Override
boolean useHive130DeltaDirName() {
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 03a6494..a31e2d1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -17,15 +17,27 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.junit.Assert;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.junit.Before;
-import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
@@ -205,12 +217,12 @@ public class TestInitiator extends CompactorTest {
LockResponse res = txnHandler.lock(req);
txnHandler.abortTxn(new AbortTxnRequest(txnid));
- for (int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50; i++) {
+ for (int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50; i++) {
txnid = openTxn();
txnHandler.abortTxn(new AbortTxnRequest(txnid));
}
GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
- Assert.assertEquals(TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
+ Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
startInitiator();
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index d0db406..cf7eb70 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -17,18 +17,32 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -935,7 +949,7 @@ public class TestWorker extends CompactorTest {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState()));
}
@Test
@@ -960,6 +974,6 @@ public class TestWorker extends CompactorTest {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
- Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+ Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
}
}
[2/2] hive git commit: HIVE-12902 Refactor TxnHandler to be an
interface (gates reviewed by Eugene Koifman)
Posted by ga...@apache.org.
HIVE-12902 Refactor TxnHandler to be an interface (gates reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1b2583ba
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1b2583ba
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1b2583ba
Branch: refs/heads/master
Commit: 1b2583ba87b80e04d861d4fa91f938ba0998fa4d
Parents: be9735e
Author: Alan Gates <ga...@hortonworks.com>
Authored: Fri Jan 29 11:46:43 2016 -0800
Committer: Alan Gates <ga...@hortonworks.com>
Committed: Fri Jan 29 11:46:43 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 44 +--
.../hive/ql/txn/compactor/TestCompactor.java | 25 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 126 ++++++-
.../hive/metastore/HiveMetaStoreClient.java | 69 ++--
.../metastore/txn/CompactionTxnHandler.java | 46 +--
.../hadoop/hive/metastore/txn/TxnHandler.java | 26 +-
.../hadoop/hive/metastore/txn/TxnStore.java | 349 +++++++++++++++++++
.../hadoop/hive/metastore/txn/TxnUtils.java | 97 ++++++
.../metastore/txn/TestCompactionTxnHandler.java | 31 +-
.../hive/metastore/txn/TestTxnHandler.java | 195 ++++++-----
.../metastore/txn/TestTxnHandlerNegative.java | 2 +-
.../ql/txn/AcidCompactionHistoryService.java | 16 +-
.../hive/ql/txn/AcidHouseKeeperService.java | 9 +-
.../hive/ql/txn/compactor/CompactorThread.java | 11 +-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 12 +-
.../hadoop/hive/ql/txn/compactor/Worker.java | 6 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 +-
.../hive/ql/lockmgr/TestDbTxnManager.java | 15 +-
.../hive/ql/txn/compactor/CompactorTest.java | 38 +-
.../hive/ql/txn/compactor/TestCleaner.java | 40 ++-
.../hive/ql/txn/compactor/TestInitiator.java | 28 +-
.../hive/ql/txn/compactor/TestWorker.java | 32 +-
22 files changed, 914 insertions(+), 325 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a78f78a..f4ca4a0 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -18,25 +18,7 @@
package org.apache.hadoop.hive.conf;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintStream;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.security.auth.login.LoginException;
+import com.google.common.base.Joiner;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate;
@@ -56,7 +38,25 @@ import org.apache.hive.common.HiveCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Joiner;
+import javax.security.auth.login.LoginException;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* Hive Configuration.
@@ -655,6 +655,10 @@ public class HiveConf extends Configuration {
METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore",
"Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. \n" +
"This class is used to store and retrieval of raw metadata objects such as table, database"),
+ METASTORE_TXN_STORE_IMPL("hive.metastore.txn.store.impl",
+ "org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler",
+ "Name of class that implements org.apache.hadoop.hive.metastore.txn.TxnStore. This " +
+ "class is used to store and retrieve transactions and locks"),
METASTORE_CONNECTION_DRIVER("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver",
"Driver class name for a JDBC metastore"),
METASTORE_MANAGER_FACTORY_CLASS("javax.jdo.PersistenceManagerFactoryClass",
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 071a17e..568f75a 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -37,10 +36,10 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
@@ -205,7 +204,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(4, compacts.size());
@@ -308,7 +307,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(4, compacts.size());
@@ -382,7 +381,7 @@ public class TestCompactor {
execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
tblName + " after load:");
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
CompactionInfo ci = new CompactionInfo("default", tblName, "bkt=0", CompactionType.MAJOR);
LOG.debug("List of stats columns before analyze Part1: " + txnHandler.findColumnsWithStats(ci));
Worker.StatsUpdater su = Worker.StatsUpdater.init(ci, colNames, conf,
@@ -517,7 +516,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(2, compacts.size());
@@ -557,7 +556,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(2, compacts.size());
@@ -599,7 +598,7 @@ public class TestCompactor {
initiator.init(stop, new AtomicBoolean());
initiator.run();
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
@@ -639,7 +638,7 @@ public class TestCompactor {
writeBatch(connection, writer, true);
// Now, compact
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
@@ -701,7 +700,7 @@ public class TestCompactor {
writeBatch(connection, writer, true);
// Now, compact
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
@@ -757,7 +756,7 @@ public class TestCompactor {
txnBatch.abort();
// Now, compact
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
@@ -823,7 +822,7 @@ public class TestCompactor {
// Now, compact
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 7830f17..dde253a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -26,11 +26,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
-
import org.apache.commons.cli.OptionBuilder;
-import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,9 +40,113 @@ import org.apache.hadoop.hive.common.cli.CommonCliOptions;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.AddPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.CacheFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.CacheFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.ClearFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.ClearFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventResponse;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprResult;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest;
+import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse;
+import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
+import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse;
+import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeRequest;
+import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeResponse;
+import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleRequest;
+import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
+import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionEventType;
+import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD;
+import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsStatsResult;
+import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
+import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo;
+import org.apache.hadoop.hive.metastore.api.PutFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.PutFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
+import org.apache.hadoop.hive.metastore.api.Role;
+import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.api.TableStatsRequest;
+import org.apache.hadoop.hive.metastore.api.TableStatsResult;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.Type;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
@@ -80,7 +180,8 @@ import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.PreReadTableEvent;
import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.shims.HadoopShims;
@@ -108,9 +209,10 @@ import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.jdo.JDOException;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.DateFormat;
@@ -229,9 +331,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
};
- private static final ThreadLocal<TxnHandler> threadLocalTxn = new ThreadLocal<TxnHandler>() {
+ private static final ThreadLocal<TxnStore> threadLocalTxn = new ThreadLocal<TxnStore>() {
@Override
- protected TxnHandler initialValue() {
+ protected TxnStore initialValue() {
return null;
}
};
@@ -538,10 +640,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
return ms;
}
- private TxnHandler getTxnHandler() {
- TxnHandler txn = threadLocalTxn.get();
+ private TxnStore getTxnHandler() {
+ TxnStore txn = threadLocalTxn.get();
if (txn == null) {
- txn = new TxnHandler(hiveConf);
+ txn = TxnUtils.getTxnStore(hiveConf);
threadLocalTxn.set(txn);
}
return txn;
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 09a6aea..0c30262 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -18,37 +18,6 @@
package org.apache.hadoop.hive.metastore;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.isIndexTable;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.security.auth.login.LoginException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
@@ -82,9 +51,9 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.FireEventRequest;
import org.apache.hadoop.hive.metastore.api.FireEventResponse;
import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest;
import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult;
-import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest;
import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse;
@@ -144,7 +113,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
@@ -159,6 +128,36 @@ import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.LoginException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME;
+import static org.apache.hadoop.hive.metastore.MetaStoreUtils.isIndexTable;
/**
* Hive Metastore Client.
@@ -1941,12 +1940,12 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
@Override
public ValidTxnList getValidTxns() throws TException {
- return TxnHandler.createValidReadTxnList(client.get_open_txns(), 0);
+ return TxnUtils.createValidReadTxnList(client.get_open_txns(), 0);
}
@Override
public ValidTxnList getValidTxns(long currentTxn) throws TException {
- return TxnHandler.createValidReadTxnList(client.get_open_txns(), currentTxn);
+ return TxnUtils.createValidReadTxnList(client.get_open_txns(), currentTxn);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 18b288d..70cbab7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -17,30 +17,35 @@
*/
package org.apache.hadoop.hive.metastore.txn;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.sql.*;
-import java.util.*;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
/**
* Extends the transaction handler with methods needed only by the compactor threads. These
* methods are not available through the thrift interface.
*/
-public class CompactionTxnHandler extends TxnHandler {
+class CompactionTxnHandler extends TxnHandler {
static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
// Always access COMPACTION_QUEUE before COMPLETED_TXN_COMPONENTS
// See TxnHandler for notes on how to deal with deadlocks. Follow those notes.
- public CompactionTxnHandler(HiveConf conf) {
- super(conf);
+ public CompactionTxnHandler() {
}
/**
@@ -621,27 +626,6 @@ public class CompactionTxnHandler extends TxnHandler {
}
/**
- * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
- * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
- * compact the files, and thus treats only open transactions as invalid. Additionally any
- * txnId > highestOpenTxnId is also invalid. This is avoid creating something like
- * delta_17_120 where txnId 80, for example, is still open.
- * @param txns txn list from the metastore
- * @return a valid txn list.
- */
- public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
- long highWater = txns.getTxn_high_water_mark();
- long minOpenTxn = Long.MAX_VALUE;
- long[] exceptions = new long[txns.getOpen_txnsSize()];
- int i = 0;
- for (TxnInfo txn : txns.getOpen_txns()) {
- if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
- exceptions[i++] = txn.getId();
- }
- highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
- return new ValidCompactorTxnList(exceptions, -1, highWater);
- }
- /**
* Record the highest txn id that the {@code ci} compaction job will pay attention to.
*/
public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException {
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index c836f80..79c4f7a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -63,14 +63,7 @@ import java.util.concurrent.TimeUnit;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class TxnHandler {
- // Compactor states (Should really be enum)
- static final public String INITIATED_RESPONSE = "initiated";
- static final public String WORKING_RESPONSE = "working";
- static final public String CLEANING_RESPONSE = "ready for cleaning";
- static final public String FAILED_RESPONSE = "failed";
- static final public String SUCCEEDED_RESPONSE = "succeeded";
- static final public String ATTEMPTED_RESPONSE = "attempted";
+abstract class TxnHandler implements TxnStore {
static final protected char INITIATED_STATE = 'i';
static final protected char WORKING_STATE = 'w';
@@ -97,7 +90,6 @@ public class TxnHandler {
static final protected char LOCK_SEMI_SHARED = 'w';
static final private int ALLOWED_REPEATED_DEADLOCKS = 10;
- public static final int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 1000;
static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName());
static private DataSource connPool;
@@ -109,7 +101,7 @@ public class TxnHandler {
* Number of consecutive deadlocks we have seen
*/
private int deadlockCnt;
- private final long deadlockRetryInterval;
+ private long deadlockRetryInterval;
protected HiveConf conf;
protected DatabaseProduct dbProduct;
@@ -117,8 +109,8 @@ public class TxnHandler {
private long timeout;
private String identifierQuoteString; // quotes to use for quoting tables, where necessary
- private final long retryInterval;
- private final int retryLimit;
+ private long retryInterval;
+ private int retryLimit;
private int retryNum;
// DEADLOCK DETECTION AND HANDLING
@@ -135,7 +127,10 @@ public class TxnHandler {
// in mind. To do this they should call checkRetryable() AFTER rolling back the db transaction,
// and then they should catch RetryException and call themselves recursively. See commitTxn for an example.
- public TxnHandler(HiveConf conf) {
+ public TxnHandler() {
+ }
+
+ public void setConf(HiveConf conf) {
this.conf = conf;
checkQFileTestHack();
@@ -155,7 +150,6 @@ public class TxnHandler {
TimeUnit.MILLISECONDS);
retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
deadlockRetryInterval = retryInterval / 10;
-
}
public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
@@ -961,7 +955,7 @@ public class TxnHandler {
* For testing only, do not use.
*/
@VisibleForTesting
- int numLocksInLockTable() throws SQLException, MetaException {
+ public int numLocksInLockTable() throws SQLException, MetaException {
Connection dbConn = null;
Statement stmt = null;
ResultSet rs = null;
@@ -984,7 +978,7 @@ public class TxnHandler {
/**
* For testing only, do not use.
*/
- long setTimeout(long milliseconds) {
+ public long setTimeout(long milliseconds) {
long previous_timeout = timeout;
timeout = milliseconds;
return previous_timeout;
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
new file mode 100644
index 0000000..5e0306a
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A handler to answer transaction related calls that come into the metastore
+ * server.
+ *
+ * Note on log messages: Please include txnid:X and lockid info using
+ * {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)}
+ * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages.
+ * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated,
+ * so keeping the format consistent makes grep'ing the logs much easier.
+ *
+ * Note on HIVE_LOCKS.hl_last_heartbeat.
+ * For locks that are part of transaction, we set this 0 (would rather set it to NULL but
+ * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding
+ * transaction in TXNS.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface TxnStore {
+
+ // Compactor states (Should really be enum)
+ static final public String INITIATED_RESPONSE = "initiated";
+ static final public String WORKING_RESPONSE = "working";
+ static final public String CLEANING_RESPONSE = "ready for cleaning";
+ static final public String FAILED_RESPONSE = "failed";
+ static final public String SUCCEEDED_RESPONSE = "succeeded";
+ static final public String ATTEMPTED_RESPONSE = "attempted";
+
+ public static final int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 1000;
+
+ public void setConf(HiveConf conf);
+
+ /**
+ * Get information about open transactions. This gives extensive information about the
+ * transactions rather than just the list of transactions. This should be used when the need
+ * is to see information about the transactions (e.g. show transactions).
+ * @return information about open transactions
+ * @throws MetaException
+ */
+ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException;
+
+ /**
+ * Get list of valid transactions. This gives just the list of transactions that are open.
+ * @return list of open transactions, as well as a high water mark.
+ * @throws MetaException
+ */
+ public GetOpenTxnsResponse getOpenTxns() throws MetaException;
+
+ /**
+ * Open a set of transactions
+ * @param rqst request to open transactions
+ * @return information on opened transactions
+ * @throws MetaException
+ */
+ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException;
+
+ /**
+ * Abort (rollback) a transaction.
+ * @param rqst info on transaction to abort
+ * @throws NoSuchTxnException
+ * @throws MetaException
+ */
+ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException;
+
+ /**
+ * Commit a transaction
+ * @param rqst info on transaction to commit
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public void commitTxn(CommitTxnRequest rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+ /**
+ * Obtain a lock.
+ * @param rqst information on the lock to obtain. If the requester is part of a transaction
+ * the txn information must be included in the lock request.
+ * @return info on the lock, including whether it was obtained.
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public LockResponse lock(LockRequest rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+ /**
+ * Check whether a lock has been obtained. This is used after {@link #lock} returned a wait
+ * state.
+ * @param rqst info on the lock to check
+ * @return info on the state of the lock
+ * @throws NoSuchTxnException
+ * @throws NoSuchLockException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public LockResponse checkLock(CheckLockRequest rqst)
+ throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
+
+ /**
+ * Unlock a lock. It is not legal to call this if the caller is part of a txn. In that case
+ * the txn should be committed or aborted instead. (Note someday this will change since
+ * multi-statement transactions will allow unlocking in the transaction.)
+ * @param rqst lock to unlock
+ * @throws NoSuchLockException
+ * @throws TxnOpenException
+ * @throws MetaException
+ */
+ public void unlock(UnlockRequest rqst)
+ throws NoSuchLockException, TxnOpenException, MetaException;
+
+ /**
+ * Get information on current locks.
+ * @param rqst lock information to retrieve
+ * @return lock information.
+ * @throws MetaException
+ */
+ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException;
+
+ /**
+ * Send a heartbeat for a lock or a transaction
+ * @param ids lock and/or txn id to heartbeat
+ * @throws NoSuchTxnException
+ * @throws NoSuchLockException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public void heartbeat(HeartbeatRequest ids)
+ throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
+
+ /**
+ * Heartbeat a group of transactions together
+ * @param rqst set of transactions to heartbat
+ * @return info on txns that were heartbeated
+ * @throws MetaException
+ */
+ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
+ throws MetaException;
+
+ /**
+ * Submit a compaction request into the queue. This is called when a user manually requests a
+ * compaction.
+ * @param rqst information on what to compact
+ * @return id of the compaction that has been started
+ * @throws MetaException
+ */
+ public long compact(CompactionRequest rqst) throws MetaException;
+
+ /**
+ * Show list of current compactions
+ * @param rqst info on which compactions to show
+ * @return compaction information
+ * @throws MetaException
+ */
+ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException;
+
+ /**
+ * Add information on a set of dynamic partitions that participated in a transaction.
+ * @param rqst dynamic partition info.
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ public void addDynamicPartitions(AddDynamicPartitions rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+ /**
+ * Timeout transactions and/or locks. This should only be called by the compactor.
+ */
+ public void performTimeOuts();
+
+ /**
+ * This will look through the completed_txn_components table and look for partitions or tables
+ * that may be ready for compaction. Also, look through txns and txn_components tables for
+ * aborted transactions that we should add to the list.
+ * @param maxAborted Maximum number of aborted queries to allow before marking this as a
+ * potential compaction.
+ * @return list of CompactionInfo structs. These will not have id, type,
+ * or runAs set since these are only potential compactions not actual ones.
+ */
+ public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException;
+
+ /**
+ * Sets the user to run as. This is for the case
+ * where the request was generated by the user and so the worker must set this value later.
+ * @param cq_id id of this entry in the queue
+ * @param user user to run the jobs as
+ */
+ public void setRunAs(long cq_id, String user) throws MetaException;
+
+ /**
+ * This will grab the next compaction request off of
+ * the queue, and assign it to the worker.
+ * @param workerId id of the worker calling this, will be recorded in the db
+ * @return an info element for this compaction request, or null if there is no work to do now.
+ */
+ public CompactionInfo findNextToCompact(String workerId) throws MetaException;
+
+ /**
+ * This will mark an entry in the queue as compacted
+ * and put it in the ready to clean state.
+ * @param info info on the compaction entry to mark as compacted.
+ */
+ public void markCompacted(CompactionInfo info) throws MetaException;
+
+ /**
+ * Find entries in the queue that are ready to
+ * be cleaned.
+ * @return information on the entry in the queue.
+ */
+ public List<CompactionInfo> findReadyToClean() throws MetaException;
+
+ /**
+ * This will remove an entry from the queue after
+ * it has been compacted.
+ *
+ * @param info info on the compaction entry to remove
+ */
+ public void markCleaned(CompactionInfo info) throws MetaException;
+
+ /**
+ * Mark a compaction entry as failed. This will move it to the compaction history queue with a
+ * failed status. It will NOT clean up aborted transactions in the table/partition associated
+ * with this compaction.
+ * @param info information on the compaction that failed.
+ * @throws MetaException
+ */
+ public void markFailed(CompactionInfo info) throws MetaException;
+
+ /**
+ * Clean up aborted transactions from txns that have no components in txn_components. The reson such
+ * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
+ * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
+ */
+ public void cleanEmptyAbortedTxns() throws MetaException;
+
+ /**
+ * This will take all entries assigned to workers
+ * on a host return them to INITIATED state. The initiator should use this at start up to
+ * clean entries from any workers that were in the middle of compacting when the metastore
+ * shutdown. It does not reset entries from worker threads on other hosts as those may still
+ * be working.
+ * @param hostname Name of this host. It is assumed this prefixes the thread's worker id,
+ * so that like hostname% will match the worker id.
+ */
+ public void revokeFromLocalWorkers(String hostname) throws MetaException;
+
+ /**
+ * This call will return all compaction queue
+ * entries assigned to a worker but over the timeout back to the initiated state.
+ * This should be called by the initiator on start up and occasionally when running to clean up
+ * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called
+ * first.
+ * @param timeout number of milliseconds since start time that should elapse before a worker is
+ * declared dead.
+ */
+ public void revokeTimedoutWorkers(long timeout) throws MetaException;
+
+ /**
+ * Queries metastore DB directly to find columns in the table which have statistics information.
+ * If {@code ci} includes partition info then per partition stats info is examined, otherwise
+ * table level stats are examined.
+ * @throws MetaException
+ */
+ public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException;
+
+ /**
+ * Record the highest txn id that the {@code ci} compaction job will pay attention to.
+ */
+ public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException;
+
+ /**
+ * For any given compactable entity (partition, table if not partitioned) the history of compactions
+ * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the
+ * history such that a configurable number of each type of state is present. Any other entries
+ * can be purged. This scheme has advantage of always retaining the last failure/success even if
+ * it's not recent.
+ * @throws MetaException
+ */
+ public void purgeCompactionHistory() throws MetaException;
+
+ /**
+ * Determine if there are enough consecutive failures compacting a table or partition that no
+ * new automatic compactions should be scheduled. User initiated compactions do not do this
+ * check.
+ * @param ci Table or partition to check.
+ * @return true if it is ok to compact, false if there have been too many failures.
+ * @throws MetaException
+ */
+ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;
+
+
+ @VisibleForTesting
+ public int numLocksInLockTable() throws SQLException, MetaException;
+
+ @VisibleForTesting
+ long setTimeout(long milliseconds);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
new file mode 100644
index 0000000..b7502c2
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+public class TxnUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
+
+ /**
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a
+ * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
+ * read the files, and thus treats both open and aborted transactions as invalid.
+ * @param txns txn list from the metastore
+ * @param currentTxn Current transaction that the user has open. If this is greater than 0 it
+ * will be removed from the exceptions list so that the user sees his own
+ * transaction as valid.
+ * @return a valid txn list.
+ */
+ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
+ long highWater = txns.getTxn_high_water_mark();
+ Set<Long> open = txns.getOpen_txns();
+ long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
+ int i = 0;
+ for(long txn: open) {
+ if (currentTxn > 0 && currentTxn == txn) continue;
+ exceptions[i++] = txn;
+ }
+ return new ValidReadTxnList(exceptions, highWater);
+ }
+
+ /**
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
+ * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
+ * compact the files, and thus treats only open transactions as invalid. Additionally any
+ * txnId > highestOpenTxnId is also invalid. This is avoid creating something like
+ * delta_17_120 where txnId 80, for example, is still open.
+ * @param txns txn list from the metastore
+ * @return a valid txn list.
+ */
+ public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
+ long highWater = txns.getTxn_high_water_mark();
+ long minOpenTxn = Long.MAX_VALUE;
+ long[] exceptions = new long[txns.getOpen_txnsSize()];
+ int i = 0;
+ for (TxnInfo txn : txns.getOpen_txns()) {
+ if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
+ exceptions[i++] = txn.getId();
+ }
+ highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
+ return new ValidCompactorTxnList(exceptions, -1, highWater);
+ }
+
+ /**
+ * Get an instance of the TxnStore that is appropriate for this store
+ * @param conf configuration
+ * @return txn store
+ */
+ public static TxnStore getTxnStore(HiveConf conf) {
+ String className = conf.getVar(HiveConf.ConfVars.METASTORE_TXN_STORE_IMPL);
+ try {
+ TxnStore handler = ((Class<? extends TxnHandler>) MetaStoreUtils.getClass(
+ className)).newInstance();
+ handler.setConf(conf);
+ return handler;
+ } catch (Exception e) {
+ LOG.error("Unable to instantiate raw store directly in fastpath mode", e);
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index ff2c2c1..2c1560b 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -17,19 +17,6 @@
*/
package org.apache.hadoop.hive.metastore.txn;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
@@ -49,18 +36,30 @@ import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
/**
* Tests for TxnHandler.
*/
public class TestCompactionTxnHandler {
private HiveConf conf = new HiveConf();
- private CompactionTxnHandler txnHandler;
+ private TxnStore txnHandler;
public TestCompactionTxnHandler() throws Exception {
TxnDbUtil.setConfValues(conf);
@@ -440,7 +439,7 @@ public class TestCompactionTxnHandler {
@Before
public void setUp() throws Exception {
TxnDbUtil.prepDb();
- txnHandler = new CompactionTxnHandler(conf);
+ txnHandler = TxnUtils.getTxnStore(conf);
}
@After
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 4debd04..f0d23ba 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -17,20 +17,6 @@
*/
package org.apache.hadoop.hive.metastore.txn;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
@@ -75,6 +61,20 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
/**
* Tests for TxnHandler.
*/
@@ -83,7 +83,7 @@ public class TestTxnHandler {
private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
private HiveConf conf = new HiveConf();
- private TxnHandler txnHandler;
+ private TxnStore txnHandler;
public TestTxnHandler() throws Exception {
TxnDbUtil.setConfValues(conf);
@@ -1153,99 +1153,102 @@ public class TestTxnHandler {
@Ignore
public void deadlockDetected() throws Exception {
LOG.debug("Starting deadlock test");
- Connection conn = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- Statement stmt = conn.createStatement();
- long now = txnHandler.getDbTime(conn);
- stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
- "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
- "'scooby.com')");
- stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
- "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
- "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
- txnHandler.LOCK_WAITING + "', '" + txnHandler.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
- "'scooby.com')");
- conn.commit();
- txnHandler.closeDbConn(conn);
-
- final AtomicBoolean sawDeadlock = new AtomicBoolean();
-
- final Connection conn1 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- final Connection conn2 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- try {
+ if (txnHandler instanceof TxnHandler) {
+ final TxnHandler tHndlr = (TxnHandler)txnHandler;
+ Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Statement stmt = conn.createStatement();
+ long now = tHndlr.getDbTime(conn);
+ stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
+ "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
+ "'scooby.com')");
+ stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
+ "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
+ "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
+ tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
+ "'scooby.com')");
+ conn.commit();
+ tHndlr.closeDbConn(conn);
+
+ final AtomicBoolean sawDeadlock = new AtomicBoolean();
+
+ final Connection conn1 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ final Connection conn2 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ try {
- for (int i = 0; i < 5; i++) {
- Thread t1 = new Thread() {
- @Override
- public void run() {
- try {
+ for (int i = 0; i < 5; i++) {
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
try {
- updateTxns(conn1);
- updateLocks(conn1);
- Thread.sleep(1000);
- conn1.commit();
- LOG.debug("no exception, no deadlock");
- } catch (SQLException e) {
try {
- txnHandler.checkRetryable(conn1, e, "thread t1");
- LOG.debug("Got an exception, but not a deadlock, SQLState is " +
- e.getSQLState() + " class of exception is " + e.getClass().getName() +
- " msg is <" + e.getMessage() + ">");
- } catch (TxnHandler.RetryException de) {
- LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
- "exception is " + e.getClass().getName() + " msg is <" + e
- .getMessage() + ">");
- sawDeadlock.set(true);
+ updateTxns(conn1);
+ updateLocks(conn1);
+ Thread.sleep(1000);
+ conn1.commit();
+ LOG.debug("no exception, no deadlock");
+ } catch (SQLException e) {
+ try {
+ tHndlr.checkRetryable(conn1, e, "thread t1");
+ LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+ e.getSQLState() + " class of exception is " + e.getClass().getName() +
+ " msg is <" + e.getMessage() + ">");
+ } catch (TxnHandler.RetryException de) {
+ LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+ "exception is " + e.getClass().getName() + " msg is <" + e
+ .getMessage() + ">");
+ sawDeadlock.set(true);
+ }
}
+ conn1.rollback();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- conn1.rollback();
- } catch (Exception e) {
- throw new RuntimeException(e);
}
- }
- };
+ };
- Thread t2 = new Thread() {
- @Override
- public void run() {
- try {
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
try {
- updateLocks(conn2);
- updateTxns(conn2);
- Thread.sleep(1000);
- conn2.commit();
- LOG.debug("no exception, no deadlock");
- } catch (SQLException e) {
try {
- txnHandler.checkRetryable(conn2, e, "thread t2");
- LOG.debug("Got an exception, but not a deadlock, SQLState is " +
- e.getSQLState() + " class of exception is " + e.getClass().getName() +
- " msg is <" + e.getMessage() + ">");
- } catch (TxnHandler.RetryException de) {
- LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
- "exception is " + e.getClass().getName() + " msg is <" + e
- .getMessage() + ">");
- sawDeadlock.set(true);
+ updateLocks(conn2);
+ updateTxns(conn2);
+ Thread.sleep(1000);
+ conn2.commit();
+ LOG.debug("no exception, no deadlock");
+ } catch (SQLException e) {
+ try {
+ tHndlr.checkRetryable(conn2, e, "thread t2");
+ LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+ e.getSQLState() + " class of exception is " + e.getClass().getName() +
+ " msg is <" + e.getMessage() + ">");
+ } catch (TxnHandler.RetryException de) {
+ LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+ "exception is " + e.getClass().getName() + " msg is <" + e
+ .getMessage() + ">");
+ sawDeadlock.set(true);
+ }
}
+ conn2.rollback();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- conn2.rollback();
- } catch (Exception e) {
- throw new RuntimeException(e);
}
- }
- };
-
- t1.start();
- t2.start();
- t1.join();
- t2.join();
- if (sawDeadlock.get()) break;
+ };
+
+ t1.start();
+ t2.start();
+ t1.join();
+ t2.join();
+ if (sawDeadlock.get()) break;
+ }
+ assertTrue(sawDeadlock.get());
+ } finally {
+ conn1.rollback();
+ tHndlr.closeDbConn(conn1);
+ conn2.rollback();
+ tHndlr.closeDbConn(conn2);
}
- assertTrue(sawDeadlock.get());
- } finally {
- conn1.rollback();
- txnHandler.closeDbConn(conn1);
- conn2.rollback();
- txnHandler.closeDbConn(conn2);
}
}
@@ -1262,7 +1265,7 @@ public class TestTxnHandler {
@Before
public void setUp() throws Exception {
TxnDbUtil.prepDb();
- txnHandler = new TxnHandler(conf);
+ txnHandler = TxnUtils.getTxnStore(conf);
}
@After
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
index a765f61..17bd01d 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
@@ -35,7 +35,7 @@ public class TestTxnHandlerNegative {
public void testBadConnection() throws Exception {
HiveConf conf = new HiveConf();
conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "blah");
- TxnHandler txnHandler1 = new TxnHandler(conf);
+ TxnStore txnHandler1 = TxnUtils.getTxnStore(conf);
MetaException e = null;
try {
txnHandler1.getOpenTxns();
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
index a91ca5c..59c8fe4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
@@ -18,20 +18,12 @@
package org.apache.hadoop.hive.ql.txn;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HouseKeeperService;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
-import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
-import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -60,10 +52,10 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
}
private static final class ObsoleteEntryReaper implements Runnable {
- private final CompactionTxnHandler txnHandler;
+ private final TxnStore txnHandler;
private final AtomicInteger isAliveCounter;
private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
- txnHandler = new CompactionTxnHandler(hiveConf);
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
this.isAliveCounter = isAliveCounter;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
index 96e4d40..882562b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
@@ -17,11 +17,12 @@
*/
package org.apache.hadoop.hive.ql.txn;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -52,10 +53,10 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
}
private static final class TimedoutTxnReaper implements Runnable {
- private final TxnHandler txnHandler;
+ private final TxnStore txnHandler;
private final AtomicInteger isAliveCounter;
private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
- txnHandler = new TxnHandler(hiveConf);
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
this.isAliveCounter = isAliveCounter;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index 3f6b099..8495c66 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -31,9 +29,12 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
@@ -50,7 +51,7 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
protected HiveConf conf;
- protected CompactionTxnHandler txnHandler;
+ protected TxnStore txnHandler;
protected RawStore rs;
protected int threadId;
protected AtomicBoolean stop;
@@ -75,7 +76,7 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
setDaemon(true); // this means the process will exit without waiting for this thread
// Get our own instance of the transaction handler
- txnHandler = new CompactionTxnHandler(conf);
+ txnHandler = TxnUtils.getTxnStore(conf);
// Get our own connection to the database so we can get table and partition information.
rs = RawStoreProxy.getProxy(conf, conf,
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 2ef06de..3705a34 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,12 +34,14 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
@@ -81,7 +81,7 @@ public class Initiator extends CompactorThread {
try {//todo: add method to only get current i.e. skip history - more efficient
ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
ValidTxnList txns =
- CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
+ TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
LOG.debug("Found " + potentials.size() + " potential compactions, " +
"checking to see if we should compact any of them");
@@ -184,7 +184,7 @@ public class Initiator extends CompactorThread {
CompactionInfo ci) {
if (compactions.getCompacts() != null) {
for (ShowCompactResponseElement e : compactions.getCompacts()) {
- if ((e.getState().equals(TxnHandler.WORKING_RESPONSE) || e.getState().equals(TxnHandler.INITIATED_RESPONSE)) &&
+ if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) &&
e.getDbname().equals(ci.dbname) &&
e.getTablename().equals(ci.tableName) &&
(e.getPartitionname() == null && ci.partName == null ||
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index ce03c8e..adffa8c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -27,13 +27,15 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
@@ -135,7 +137,7 @@ public class Worker extends CompactorThread {
final boolean isMajor = ci.isMajorCompaction();
final ValidTxnList txns =
- CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
+ TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
LOG.debug("ValidCompactTxnList: " + txns.writeToString());
txnHandler.setCompactionHighestTxnId(ci, txns.getHighWatermark());
final StringBuilder jobName = new StringBuilder(name);
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index fa576fa..5ed3c44 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -31,10 +31,10 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService;
@@ -486,7 +486,7 @@ public class TestTxnCommands2 {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
AtomicBoolean stop = new AtomicBoolean(true);
//create failed compactions
for(int i = 0; i < numFailedCompactions; i++) {
@@ -556,27 +556,27 @@ public class TestTxnCommands2 {
private int working;
private int total;
}
- private static CompactionsByState countCompacts(TxnHandler txnHandler) throws MetaException {
+ private static CompactionsByState countCompacts(TxnStore txnHandler) throws MetaException {
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
CompactionsByState compactionsByState = new CompactionsByState();
compactionsByState.total = resp.getCompactsSize();
for(ShowCompactResponseElement compact : resp.getCompacts()) {
- if(TxnHandler.FAILED_RESPONSE.equals(compact.getState())) {
+ if(TxnStore.FAILED_RESPONSE.equals(compact.getState())) {
compactionsByState.failed++;
}
- else if(TxnHandler.CLEANING_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.CLEANING_RESPONSE.equals(compact.getState())) {
compactionsByState.readyToClean++;
}
- else if(TxnHandler.INITIATED_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.INITIATED_RESPONSE.equals(compact.getState())) {
compactionsByState.initiated++;
}
- else if(TxnHandler.SUCCEEDED_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.SUCCEEDED_RESPONSE.equals(compact.getState())) {
compactionsByState.succeeded++;
}
- else if(TxnHandler.WORKING_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.WORKING_RESPONSE.equals(compact.getState())) {
compactionsByState.working++;
}
- else if(TxnHandler.ATTEMPTED_RESPONSE.equals(compact.getState())) {
+ else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) {
compactionsByState.attempted++;
}
}
@@ -632,7 +632,7 @@ public class TestTxnCommands2 {
runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
//run Worker to execute compaction
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
Worker t = new Worker();
t.setThreadId((int) t.getId());
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 88b379c..a3bf9d3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -20,9 +20,10 @@ package org.apache.hadoop.hive.ql.lockmgr;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
@@ -38,7 +39,11 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@@ -237,10 +242,10 @@ public class TestDbTxnManager {
}
expireLocks(txnMgr, 5);
//create a lot of locks
- for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
+ for(int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat
}
- expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
+ expireLocks(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
}
private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception {
DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager();
http://git-wip-us.apache.org/repos/asf/hive/blob/1b2583ba/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index a929c95..2d1ecb5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -17,18 +17,32 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.MetaStoreThread;
-import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -39,9 +53,15 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.File;
@@ -62,7 +82,7 @@ public abstract class CompactorTest {
static final private String CLASS_NAME = CompactorTest.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
- protected CompactionTxnHandler txnHandler;
+ protected TxnStore txnHandler;
protected IMetaStoreClient ms;
protected long sleepTime = 1000;
protected HiveConf conf;
@@ -75,7 +95,7 @@ public abstract class CompactorTest {
TxnDbUtil.setConfValues(conf);
TxnDbUtil.cleanDb();
ms = new HiveMetaStoreClient(conf);
- txnHandler = new CompactionTxnHandler(conf);
+ txnHandler = TxnUtils.getTxnStore(conf);
tmpdir = new File(System.getProperty("java.io.tmpdir") +
System.getProperty("file.separator") + "compactor_test_tables");
tmpdir.mkdir();