You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/10/13 23:32:54 UTC
svn commit: r1631562 - in /hive/branches/branch-0.14:
common/src/java/org/apache/hadoop/hive/conf/
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/
metastore/src/java/org/apache/hadoop/hive/metastore/
ql/src/java/org/apache/hadoo...
Author: gates
Date: Mon Oct 13 21:32:54 2014
New Revision: 1631562
URL: http://svn.apache.org/r1631562
Log:
HIVE-8258 Compactor cleaners can be starved on a busy table or partition. (Alan Gates reviewed by Eugene Koifman)
Modified:
hive/branches/branch-0.14/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/branches/branch-0.14/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
Modified: hive/branches/branch-0.14/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/branch-0.14/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Oct 13 21:32:54 2014
@@ -1296,6 +1296,9 @@ public class HiveConf extends Configurat
"Number of aborted transactions involving a particular table or partition before major\n" +
"compaction is initiated."),
+ HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
+
// For HBase storage handler
HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true,
"Whether writes to HBase should be forced to the write-ahead log. \n" +
Modified: hive/branches/branch-0.14/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java (original)
+++ hive/branches/branch-0.14/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java Mon Oct 13 21:32:54 2014
@@ -230,8 +230,9 @@ public class TestCompactor {
t.setThreadId((int) t.getId());
t.setHiveConf(conf);
MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer();
+ MetaStoreThread.BooleanPointer looped = new MetaStoreThread.BooleanPointer();
stop.boolVal = true;
- t.init(stop);
+ t.init(stop, looped);
t.run();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Modified: hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Mon Oct 13 21:32:54 2014
@@ -5933,7 +5933,7 @@ public class HiveMetaStore extends Thrif
LOG.info("Starting metastore thread of type " + thread.getClass().getName());
thread.setHiveConf(conf);
thread.setThreadId(nextThreadId++);
- thread.init(new MetaStoreThread.BooleanPointer());
+ thread.init(new MetaStoreThread.BooleanPointer(), new MetaStoreThread.BooleanPointer());
thread.start();
}
}
Modified: hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java (original)
+++ hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java Mon Oct 13 21:32:54 2014
@@ -43,8 +43,13 @@ public interface MetaStoreThread {
* have been called.
* @param stop a flag to watch for when to stop. If this value is set to true,
* the thread will terminate the next time through its main loop.
+ * @param looped a flag that is set to true everytime a thread goes through it's main loop.
+ * This is purely for testing so that tests can assure themselves that the thread
+ * has run through it's loop once. The test can set this value to false. The
+ * thread should then assure that the loop has been gone completely through at
+ * least once.
*/
- void init(BooleanPointer stop) throws MetaException;
+ void init(BooleanPointer stop, BooleanPointer looped) throws MetaException;
/**
* Run the thread in the background. This must not be called until
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Mon Oct 13 21:32:54 2014
@@ -348,7 +348,7 @@ public class AcidUtils {
long bestBaseTxn = 0;
final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
List<ParsedDelta> working = new ArrayList<ParsedDelta>();
- final List<FileStatus> original = new ArrayList<FileStatus>();
+ List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
final List<FileStatus> obsolete = new ArrayList<FileStatus>();
List<FileStatus> children = SHIMS.listLocatedStatus(fs, directory,
hiddenFileFilter);
@@ -375,16 +375,26 @@ public class AcidUtils {
working.add(delta);
}
} else {
- findOriginals(fs, child, original);
+ // This is just the directory. We need to recurse and find the actual files. But don't
+ // do this until we have determined there is no base. This saves time. Plus,
+ // it is possible that the cleaner is running and removing these original files,
+ // in which case recursing through them could cause us to get an error.
+ originalDirectories.add(child);
}
}
+ final List<FileStatus> original = new ArrayList<FileStatus>();
// if we have a base, the original files are obsolete.
if (bestBase != null) {
- obsolete.addAll(original);
// remove the entries so we don't get confused later and think we should
// use them.
original.clear();
+ } else {
+ // Okay, we're going to need these originals. Recurse through them and figure out what we
+ // really need.
+ for (FileStatus origDir : originalDirectories) {
+ findOriginals(fs, origDir, original);
+ }
}
Collections.sort(working);
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java Mon Oct 13 21:32:54 2014
@@ -24,15 +24,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnListImpl;
-import org.apache.hadoop.hive.metastore.api.LockComponent;
-import org.apache.hadoop.hive.metastore.api.LockLevel;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -41,7 +38,12 @@ import org.apache.hadoop.util.StringUtil
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
/**
* A class to clean directories after compactions. This will run in a separate thread.
@@ -50,35 +52,85 @@ public class Cleaner extends CompactorTh
static final private String CLASS_NAME = Cleaner.class.getName();
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
- private long cleanerCheckInterval = 5000;
+ private long cleanerCheckInterval = 0;
+
+ // List of compactions to clean.
+ private Map<Long, Set<Long>> compactId2LockMap = new HashMap<Long, Set<Long>>();
+ private Map<Long, CompactionInfo> compactId2CompactInfoMap = new HashMap<Long, CompactionInfo>();
@Override
public void run() {
- // Make sure nothing escapes this run method and kills the metastore at large,
- // so wrap it in a big catch Throwable statement.
+ if (cleanerCheckInterval == 0) {
+ cleanerCheckInterval = conf.getTimeVar(
+ HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
do {
+ // This is solely for testing. It checks if the test has set the looped value to false,
+ // and if so remembers that and then sets it to true at the end. We have to check here
+ // first to make sure we go through a complete iteration of the loop before resetting it.
+ boolean setLooped = !looped.boolVal;
+ // Make sure nothing escapes this run method and kills the metastore at large,
+ // so wrap it in a big catch Throwable statement.
try {
long startedAt = System.currentTimeMillis();
- // Now look for new entries ready to be cleaned.
+ // First look for all the compactions that are waiting to be cleaned. If we have not
+ // seen an entry before, look for all the locks held on that table or partition and
+ // record them. We will then only clean the partition once all of those locks have been
+ // released. This way we avoid removing the files while they are in use,
+ // while at the same time avoiding starving the cleaner as new readers come along.
+ // This works because we know that any reader who comes along after the worker thread has
+ // done the compaction will read the more up to date version of the data (either in a
+ // newer delta or in a newer base).
List<CompactionInfo> toClean = txnHandler.findReadyToClean();
- for (CompactionInfo ci : toClean) {
- LockComponent comp = null;
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, ci.dbname);
- comp.setTablename(ci.tableName);
- if (ci.partName != null) comp.setPartitionname(ci.partName);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest rqst = new LockRequest(components, System.getProperty("user.name"),
- Worker.hostname());
- LockResponse rsp = txnHandler.lockNoWait(rqst);
+ if (toClean.size() > 0 || compactId2LockMap.size() > 0) {
+ ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest());
+
+ for (CompactionInfo ci : toClean) {
+ // Check to see if we have seen this request before. If so, ignore it. If not,
+ // add it to our queue.
+ if (!compactId2LockMap.containsKey(ci.id)) {
+ compactId2LockMap.put(ci.id, findRelatedLocks(ci, locksResponse));
+ compactId2CompactInfoMap.put(ci.id, ci);
+ }
+ }
+
+ // Now, for each entry in the queue, see if all of the associated locks are clear so we
+ // can clean
+ Set<Long> currentLocks = buildCurrentLockSet(locksResponse);
+ List<Long> expiredLocks = new ArrayList<Long>();
+ List<Long> compactionsCleaned = new ArrayList<Long>();
try {
- if (rsp.getState() == LockState.ACQUIRED) {
- clean(ci);
+ for (Map.Entry<Long, Set<Long>> queueEntry : compactId2LockMap.entrySet()) {
+ boolean sawLock = false;
+ for (Long lockId : queueEntry.getValue()) {
+ if (currentLocks.contains(lockId)) {
+ sawLock = true;
+ break;
+ } else {
+ expiredLocks.add(lockId);
+ }
+ }
+
+ if (!sawLock) {
+ // Remember to remove this when we're out of the loop,
+ // we can't do it in the loop or we'll get a concurrent modification exception.
+ compactionsCleaned.add(queueEntry.getKey());
+ clean(compactId2CompactInfoMap.get(queueEntry.getKey()));
+ } else {
+ // Remove the locks we didn't see so we don't look for them again next time
+ for (Long lockId : expiredLocks) {
+ queueEntry.getValue().remove(lockId);
+ }
+ }
}
} finally {
- if (rsp.getState() == LockState.ACQUIRED) {
- txnHandler.unlock(new UnlockRequest(rsp.getLockid()));
+ if (compactionsCleaned.size() > 0) {
+ for (Long compactId : compactionsCleaned) {
+ compactId2LockMap.remove(compactId);
+ compactId2CompactInfoMap.remove(compactId);
+ }
}
}
}
@@ -91,9 +143,37 @@ public class Cleaner extends CompactorTh
LOG.error("Caught an exception in the main loop of compactor cleaner, " +
StringUtils.stringifyException(t));
}
+ if (setLooped) {
+ looped.boolVal = true;
+ }
} while (!stop.boolVal);
}
+ private Set<Long> findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) {
+ Set<Long> relatedLocks = new HashSet<Long>();
+ for (ShowLocksResponseElement lock : locksResponse.getLocks()) {
+ if (ci.dbname.equals(lock.getDbname())) {
+ if ((ci.tableName == null && lock.getTablename() == null) ||
+ (ci.tableName != null && ci.tableName.equals(lock.getTablename()))) {
+ if ((ci.partName == null && lock.getPartname() == null) ||
+ (ci.partName != null && ci.partName.equals(lock.getPartname()))) {
+ relatedLocks.add(lock.getLockid());
+ }
+ }
+ }
+ }
+
+ return relatedLocks;
+ }
+
+ private Set<Long> buildCurrentLockSet(ShowLocksResponse locksResponse) {
+ Set<Long> currentLocks = new HashSet<Long>(locksResponse.getLocks().size());
+ for (ShowLocksResponseElement lock : locksResponse.getLocks()) {
+ currentLocks.add(lock.getLockid());
+ }
+ return currentLocks;
+ }
+
private void clean(CompactionInfo ci) throws MetaException {
LOG.info("Starting cleaning for " + ci.getFullPartitionName());
try {
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java Mon Oct 13 21:32:54 2014
@@ -53,6 +53,7 @@ abstract class CompactorThread extends T
protected RawStore rs;
protected int threadId;
protected BooleanPointer stop;
+ protected BooleanPointer looped;
@Override
public void setHiveConf(HiveConf conf) {
@@ -66,8 +67,9 @@ abstract class CompactorThread extends T
}
@Override
- public void init(BooleanPointer stop) throws MetaException {
+ public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException {
this.stop = stop;
+ this.looped = looped;
setPriority(MIN_PRIORITY);
setDaemon(true); // this means the process will exit without waiting for this thread
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java Mon Oct 13 21:32:54 2014
@@ -137,8 +137,8 @@ public class Initiator extends Compactor
}
@Override
- public void init(BooleanPointer stop) throws MetaException {
- super.init(stop);
+ public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException {
+ super.init(stop, looped);
checkInterval =
conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS) ;
}
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java Mon Oct 13 21:32:54 2014
@@ -168,8 +168,8 @@ public class Worker extends CompactorThr
}
@Override
- public void init(BooleanPointer stop) throws MetaException {
- super.init(stop);
+ public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException {
+ super.init(stop, looped);
StringBuilder name = new StringBuilder(hostname());
name.append("-");
Modified: hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java (original)
+++ hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java Mon Oct 13 21:32:54 2014
@@ -217,11 +217,11 @@ public class TestAcidUtils {
Path part = new MockPath(fs, "/tbl/part1");
AcidUtils.Directory dir =
AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("150:"));
+ // The two original buckets won't be in the obsolete list because we don't look at those
+ // until we have determined there is no base.
List<FileStatus> obsolete = dir.getObsolete();
- assertEquals(3, obsolete.size());
+ assertEquals(1, obsolete.size());
assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).getPath().toString());
- assertEquals("mock:/tbl/part1/000000_0", obsolete.get(1).getPath().toString());
- assertEquals("mock:/tbl/part1/000001_1", obsolete.get(2).getPath().toString());
assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString());
}
Modified: hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java (original)
+++ hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java Mon Oct 13 21:32:54 2014
@@ -63,12 +63,13 @@ public abstract class CompactorTest {
protected CompactionTxnHandler txnHandler;
protected IMetaStoreClient ms;
protected long sleepTime = 1000;
+ protected HiveConf conf;
private final MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer();
private final File tmpdir;
protected CompactorTest() throws Exception {
- HiveConf conf = new HiveConf();
+ conf = new HiveConf();
TxnDbUtil.setConfValues(conf);
TxnDbUtil.cleanDb();
ms = new HiveMetaStoreClient(conf);
@@ -79,16 +80,20 @@ public abstract class CompactorTest {
tmpdir.deleteOnExit();
}
- protected void startInitiator(HiveConf conf) throws Exception {
- startThread('i', conf);
+ protected void startInitiator() throws Exception {
+ startThread('i', true);
}
- protected void startWorker(HiveConf conf) throws Exception {
- startThread('w', conf);
+ protected void startWorker() throws Exception {
+ startThread('w', true);
}
- protected void startCleaner(HiveConf conf) throws Exception {
- startThread('c', conf);
+ protected void startCleaner() throws Exception {
+ startThread('c', true);
+ }
+
+ protected void startCleaner(MetaStoreThread.BooleanPointer looped) throws Exception {
+ startThread('c', false, looped);
}
protected Table newTable(String dbName, String tableName, boolean partitioned) throws TException {
@@ -117,6 +122,9 @@ public abstract class CompactorTest {
table.setParameters(parameters);
+ // drop the table first, in case some previous test created it
+ ms.dropTable(dbName, tableName);
+
ms.createTable(table);
return table;
}
@@ -142,37 +150,27 @@ public abstract class CompactorTest {
return txns.get(0);
}
- protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
- int numRecords) throws Exception{
- addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true);
+ protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords)
+ throws Exception {
+ addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true);
}
- protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn,
- int numRecords) throws Exception{
- addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true);
+ protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords) throws Exception {
+ addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true);
}
- protected void addLegacyFile(HiveConf conf, Table t, Partition p,
- int numRecords) throws Exception {
- addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY, 2, true);
+ protected void addLegacyFile(Table t, Partition p, int numRecords) throws Exception {
+ addFile(t, p, 0, 0, numRecords, FileType.LEGACY, 2, true);
}
- protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
- int numRecords, int numBuckets, boolean allBucketsPresent)
- throws Exception {
- addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, allBucketsPresent);
+ protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords,
+ int numBuckets, boolean allBucketsPresent) throws Exception {
+ addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, allBucketsPresent);
}
- protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn,
- int numRecords, int numBuckets, boolean allBucketsPresent)
- throws Exception {
- addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE, numBuckets, allBucketsPresent);
- }
-
- protected void addLegacyFile(HiveConf conf, Table t, Partition p,
- int numRecords, int numBuckets, boolean allBucketsPresent)
- throws Exception {
- addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY, numBuckets, allBucketsPresent);
+ protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords, int numBuckets,
+ boolean allBucketsPresent) throws Exception {
+ addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, numBuckets, allBucketsPresent);
}
protected List<Path> getDirectories(HiveConf conf, Table t, Partition p) throws Exception {
@@ -191,6 +189,10 @@ public abstract class CompactorTest {
for (long tid : rsp.getTxn_ids()) txnHandler.commitTxn(new CommitTxnRequest(tid));
}
+ protected void stopThread() {
+ stop.boolVal = true;
+ }
+
private StorageDescriptor newStorageDescriptor(String location, List<Order> sortCols) {
StorageDescriptor sd = new StorageDescriptor();
List<FieldSchema> cols = new ArrayList<FieldSchema>(2);
@@ -214,9 +216,13 @@ public abstract class CompactorTest {
return sd;
}
- // I can't do this with @Before because I want to be able to control the config file provided
- // to each test.
- private void startThread(char type, HiveConf conf) throws Exception {
+ // I can't do this with @Before because I want to be able to control when the thead starts
+ private void startThread(char type, boolean stopAfterOne) throws Exception {
+ startThread(type, stopAfterOne, new MetaStoreThread.BooleanPointer());
+ }
+
+ private void startThread(char type, boolean stopAfterOne, MetaStoreThread.BooleanPointer looped)
+ throws Exception {
TxnDbUtil.setConfValues(conf);
CompactorThread t = null;
switch (type) {
@@ -227,9 +233,10 @@ public abstract class CompactorTest {
}
t.setThreadId((int) t.getId());
t.setHiveConf(conf);
- stop.boolVal = true;
- t.init(stop);
- t.run();
+ stop.boolVal = stopAfterOne;
+ t.init(stop, looped);
+ if (stopAfterOne) t.run();
+ else t.start();
}
private String getLocation(String tableName, String partValue) {
@@ -243,7 +250,7 @@ public abstract class CompactorTest {
private enum FileType {BASE, DELTA, LEGACY};
- private void addFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
+ private void addFile(Table t, Partition p, long minTxn, long maxTxn,
int numRecords, FileType type, int numBuckets,
boolean allBucketsPresent) throws Exception {
String partValue = (p == null) ? null : p.getValues().get(0);
Modified: hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java (original)
+++ hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java Mon Oct 13 21:32:54 2014
@@ -18,21 +18,26 @@
package org.apache.hadoop.hive.ql.txn.compactor;
import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
* Tests for the compactor Cleaner thread
*/
public class TestCleaner extends CompactorTest {
+
+ static final private Log LOG = LogFactory.getLog(TestCleaner.class.getName());
+
public TestCleaner() throws Exception {
super();
}
@@ -41,19 +46,17 @@ public class TestCleaner extends Compact
public void nothing() throws Exception {
// Test that the whole things works when there's nothing in the queue. This is just a
// survival test.
- startCleaner(new HiveConf());
+ startCleaner();
}
@Test
public void cleanupAfterMajorTableCompaction() throws Exception {
Table t = newTable("default", "camtc", false);
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
- addBaseFile(conf, t, null, 25L, 25);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ addBaseFile(t, null, 25L, 25);
burnThroughTransactions(25);
@@ -63,7 +66,7 @@ public class TestCleaner extends Compact
txnHandler.markCompacted(ci);
txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
- startCleaner(conf);
+ startCleaner();
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -80,12 +83,10 @@ public class TestCleaner extends Compact
Table t = newTable("default", "campc", true);
Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
- addBaseFile(conf, t, p, 25L, 25);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 25L, 25);
burnThroughTransactions(25);
@@ -96,7 +97,7 @@ public class TestCleaner extends Compact
txnHandler.markCompacted(ci);
txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
- startCleaner(conf);
+ startCleaner();
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -112,12 +113,10 @@ public class TestCleaner extends Compact
public void cleanupAfterMinorTableCompaction() throws Exception {
Table t = newTable("default", "camitc", false);
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
- addDeltaFile(conf, t, null, 21L, 24L, 4);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ addDeltaFile(t, null, 21L, 24L, 4);
burnThroughTransactions(25);
@@ -127,7 +126,7 @@ public class TestCleaner extends Compact
txnHandler.markCompacted(ci);
txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
- startCleaner(conf);
+ startCleaner();
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -151,12 +150,10 @@ public class TestCleaner extends Compact
Table t = newTable("default", "camipc", true);
Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
- addDeltaFile(conf, t, p, 21L, 24L, 4);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addDeltaFile(t, p, 21L, 24L, 4);
burnThroughTransactions(25);
@@ -167,7 +164,7 @@ public class TestCleaner extends Compact
txnHandler.markCompacted(ci);
txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
- startCleaner(conf);
+ startCleaner();
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -190,12 +187,10 @@ public class TestCleaner extends Compact
public void blockedByLockTable() throws Exception {
Table t = newTable("default", "bblt", false);
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
- addDeltaFile(conf, t, null, 21L, 24L, 4);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ addDeltaFile(t, null, 21L, 24L, 4);
burnThroughTransactions(25);
@@ -212,7 +207,7 @@ public class TestCleaner extends Compact
LockRequest req = new LockRequest(components, "me", "localhost");
LockResponse res = txnHandler.lock(req);
- startCleaner(conf);
+ startCleaner();
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -228,12 +223,10 @@ public class TestCleaner extends Compact
Table t = newTable("default", "bblp", true);
Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
- addDeltaFile(conf, t, p, 21L, 24L, 4);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addDeltaFile(t, p, 21L, 24L, 4);
burnThroughTransactions(25);
@@ -244,7 +237,7 @@ public class TestCleaner extends Compact
txnHandler.markCompacted(ci);
txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
comp.setTablename("bblp");
comp.setPartitionname("ds=today");
List<LockComponent> components = new ArrayList<LockComponent>(1);
@@ -252,7 +245,7 @@ public class TestCleaner extends Compact
LockRequest req = new LockRequest(components, "me", "localhost");
LockResponse res = txnHandler.lock(req);
- startCleaner(conf);
+ startCleaner();
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -265,15 +258,154 @@ public class TestCleaner extends Compact
}
@Test
+ public void notBlockedBySubsequentLock() throws Exception {
+ Table t = newTable("default", "bblt", false);
+
+ // Set the run frequency low on this test so it doesn't take long
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100,
+ TimeUnit.MILLISECONDS);
+
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ addDeltaFile(t, null, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default");
+ comp.setTablename("bblt");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+
+ MetaStoreThread.BooleanPointer looped = new MetaStoreThread.BooleanPointer();
+ looped.boolVal = false;
+ startCleaner(looped);
+
+ // Make sure the compactor has a chance to run once
+ while (!looped.boolVal) {
+ Thread.currentThread().sleep(100);
+ }
+
+ // There should still be one request, as the locks still held.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+
+ // obtain a second lock. This shouldn't block cleaner as it was acquired after the initial
+ // clean request
+ LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default");
+ comp2.setTablename("bblt");
+ List<LockComponent> components2 = new ArrayList<LockComponent>(1);
+ components2.add(comp2);
+ LockRequest req2 = new LockRequest(components, "me", "localhost");
+ LockResponse res2 = txnHandler.lock(req2);
+
+ // Unlock the previous lock
+ txnHandler.unlock(new UnlockRequest(res.getLockid()));
+ looped.boolVal = false;
+
+ while (!looped.boolVal) {
+ Thread.currentThread().sleep(100);
+ }
+ stopThread();
+ Thread.currentThread().sleep(200);
+
+
+ // Check there are no compactions requests left.
+ rsp = txnHandler.showCompact(new ShowCompactRequest());
+ compacts = rsp.getCompacts();
+ Assert.assertEquals(0, compacts.size());
+ }
+
+ @Test
+ public void partitionNotBlockedBySubsequentLock() throws Exception {
+ Table t = newTable("default", "bblt", true);
+ Partition p = newPartition(t, "today");
+
+ // Set the run frequency low on this test so it doesn't take long
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100,
+ TimeUnit.MILLISECONDS);
+
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addDeltaFile(t, p, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default");
+ comp.setTablename("bblt");
+ comp.setPartitionname("ds=today");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+
+ MetaStoreThread.BooleanPointer looped = new MetaStoreThread.BooleanPointer();
+ looped.boolVal = false;
+ startCleaner(looped);
+
+ // Make sure the compactor has a chance to run once
+ while (!looped.boolVal) {
+ Thread.currentThread().sleep(100);
+ }
+
+ // There should still be one request, as the locks still held.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+
+
+ // obtain a second lock. This shouldn't block cleaner as it was acquired after the initial
+ // clean request
+ LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default");
+ comp2.setTablename("bblt");
+ comp2.setPartitionname("ds=today");
+ List<LockComponent> components2 = new ArrayList<LockComponent>(1);
+ components2.add(comp2);
+ LockRequest req2 = new LockRequest(components, "me", "localhost");
+ LockResponse res2 = txnHandler.lock(req2);
+
+ // Unlock the previous lock
+ txnHandler.unlock(new UnlockRequest(res.getLockid()));
+ looped.boolVal = false;
+
+ while (!looped.boolVal) {
+ Thread.currentThread().sleep(100);
+ }
+ stopThread();
+ Thread.currentThread().sleep(200);
+
+
+ // Check there are no compactions requests left.
+ rsp = txnHandler.showCompact(new ShowCompactRequest());
+ compacts = rsp.getCompacts();
+ Assert.assertEquals(0, compacts.size());
+ }
+
+ @Test
public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception {
Table t = newTable("default", "campcnb", true);
Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();
-
- addDeltaFile(conf, t, p, 1L, 22L, 22);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
- addBaseFile(conf, t, p, 25L, 25);
+ addDeltaFile(t, p, 1L, 22L, 22);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 25L, 25);
burnThroughTransactions(25);
@@ -284,7 +416,7 @@ public class TestCleaner extends Compact
txnHandler.markCompacted(ci);
txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
- startCleaner(conf);
+ startCleaner();
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -295,9 +427,4 @@ public class TestCleaner extends Compact
Assert.assertEquals(1, paths.size());
Assert.assertEquals("base_25", paths.get(0).getName());
}
-
- @Before
- public void setUpTxnDb() throws Exception {
- TxnDbUtil.setConfValues(new HiveConf());
- }
}
Modified: hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java (original)
+++ hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java Mon Oct 13 21:32:54 2014
@@ -47,7 +47,7 @@ public class TestInitiator extends Compa
public void nothing() throws Exception {
// Test that the whole things works when there's nothing in the queue. This is just a
// survival test.
- startInitiator(new HiveConf());
+ startInitiator();
}
@Test
@@ -63,7 +63,7 @@ public class TestInitiator extends Compa
txnHandler.findNextToCompact(Worker.hostname() + "-193892");
txnHandler.findNextToCompact("nosuchhost-193892");
- startInitiator(new HiveConf());
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -89,10 +89,9 @@ public class TestInitiator extends Compa
txnHandler.findNextToCompact("nosuchhost-193892");
- HiveConf conf = new HiveConf();
conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L, TimeUnit.MILLISECONDS);
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -104,7 +103,6 @@ public class TestInitiator extends Compa
public void majorCompactOnTableTooManyAborts() throws Exception {
Table t = newTable("default", "mcottma", false);
- HiveConf conf = new HiveConf();
HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
for (int i = 0; i < 11; i++) {
@@ -119,7 +117,7 @@ public class TestInitiator extends Compa
txnHandler.abortTxn(new AbortTxnRequest(txnid));
}
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -134,7 +132,6 @@ public class TestInitiator extends Compa
Table t = newTable("default", "mcoptma", true);
Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();
HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
for (int i = 0; i < 11; i++) {
@@ -150,7 +147,7 @@ public class TestInitiator extends Compa
txnHandler.abortTxn(new AbortTxnRequest(txnid));
}
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -168,7 +165,6 @@ public class TestInitiator extends Compa
Partition p = newPartition(t, "day-" + i);
}
- HiveConf conf = new HiveConf();
HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
for (int i = 0; i < 11; i++) {
@@ -184,7 +180,7 @@ public class TestInitiator extends Compa
txnHandler.abortTxn(new AbortTxnRequest(txnid));
}
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(0, rsp.getCompactsSize());
@@ -197,8 +193,6 @@ public class TestInitiator extends Compa
// accidently clean it too.
Table t = newTable("default", "ceat", false);
- HiveConf conf = new HiveConf();
-
long txnid = openTxn();
LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
comp.setTablename("ceat");
@@ -216,7 +210,7 @@ public class TestInitiator extends Compa
GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
Assert.assertEquals(101, openTxns.getOpen_txnsSize());
- startInitiator(conf);
+ startInitiator();
openTxns = txnHandler.getOpenTxns();
Assert.assertEquals(1, openTxns.getOpen_txnsSize());
@@ -228,7 +222,6 @@ public class TestInitiator extends Compa
parameters.put("NO_AUTO_COMPACTION", "true");
Table t = newTable("default", "ncwncs", false, parameters);
- HiveConf conf = new HiveConf();
HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
for (int i = 0; i < 11; i++) {
@@ -243,7 +236,7 @@ public class TestInitiator extends Compa
txnHandler.abortTxn(new AbortTxnRequest(txnid));
}
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(0, rsp.getCompactsSize());
@@ -253,7 +246,6 @@ public class TestInitiator extends Compa
public void noCompactWhenCompactAlreadyScheduled() throws Exception {
Table t = newTable("default", "ncwcas", false);
- HiveConf conf = new HiveConf();
HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
for (int i = 0; i < 11; i++) {
@@ -277,7 +269,7 @@ public class TestInitiator extends Compa
Assert.assertEquals("initiated", compacts.get(0).getState());
Assert.assertEquals("ncwcas", compacts.get(0).getTablename());
- startInitiator(conf);
+ startInitiator();
rsp = txnHandler.showCompact(new ShowCompactRequest());
compacts = rsp.getCompacts();
@@ -291,11 +283,9 @@ public class TestInitiator extends Compa
public void compactTableHighDeltaPct() throws Exception {
Table t = newTable("default", "cthdp", false);
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
burnThroughTransactions(23);
@@ -309,7 +299,7 @@ public class TestInitiator extends Compa
LockResponse res = txnHandler.lock(req);
txnHandler.commitTxn(new CommitTxnRequest(txnid));
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -324,11 +314,9 @@ public class TestInitiator extends Compa
Table t = newTable("default", "cphdp", true);
Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
burnThroughTransactions(23);
@@ -343,7 +331,7 @@ public class TestInitiator extends Compa
LockResponse res = txnHandler.lock(req);
txnHandler.commitTxn(new CommitTxnRequest(txnid));
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -358,11 +346,9 @@ public class TestInitiator extends Compa
public void noCompactTableDeltaPctNotHighEnough() throws Exception {
Table t = newTable("default", "nctdpnhe", false);
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 50L, 50);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addBaseFile(t, null, 50L, 50);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
burnThroughTransactions(53);
@@ -376,7 +362,7 @@ public class TestInitiator extends Compa
LockResponse res = txnHandler.lock(req);
txnHandler.commitTxn(new CommitTxnRequest(txnid));
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(0, rsp.getCompactsSize());
@@ -386,20 +372,18 @@ public class TestInitiator extends Compa
public void compactTableTooManyDeltas() throws Exception {
Table t = newTable("default", "cttmd", false);
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 200L, 200);
- addDeltaFile(conf, t, null, 201L, 201L, 1);
- addDeltaFile(conf, t, null, 202L, 202L, 1);
- addDeltaFile(conf, t, null, 203L, 203L, 1);
- addDeltaFile(conf, t, null, 204L, 204L, 1);
- addDeltaFile(conf, t, null, 205L, 205L, 1);
- addDeltaFile(conf, t, null, 206L, 206L, 1);
- addDeltaFile(conf, t, null, 207L, 207L, 1);
- addDeltaFile(conf, t, null, 208L, 208L, 1);
- addDeltaFile(conf, t, null, 209L, 209L, 1);
- addDeltaFile(conf, t, null, 210L, 210L, 1);
- addDeltaFile(conf, t, null, 211L, 211L, 1);
+ addBaseFile(t, null, 200L, 200);
+ addDeltaFile(t, null, 201L, 201L, 1);
+ addDeltaFile(t, null, 202L, 202L, 1);
+ addDeltaFile(t, null, 203L, 203L, 1);
+ addDeltaFile(t, null, 204L, 204L, 1);
+ addDeltaFile(t, null, 205L, 205L, 1);
+ addDeltaFile(t, null, 206L, 206L, 1);
+ addDeltaFile(t, null, 207L, 207L, 1);
+ addDeltaFile(t, null, 208L, 208L, 1);
+ addDeltaFile(t, null, 209L, 209L, 1);
+ addDeltaFile(t, null, 210L, 210L, 1);
+ addDeltaFile(t, null, 211L, 211L, 1);
burnThroughTransactions(210);
@@ -413,7 +397,7 @@ public class TestInitiator extends Compa
LockResponse res = txnHandler.lock(req);
txnHandler.commitTxn(new CommitTxnRequest(txnid));
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -428,20 +412,18 @@ public class TestInitiator extends Compa
Table t = newTable("default", "cptmd", true);
Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, p, 200L, 200);
- addDeltaFile(conf, t, p, 201L, 201L, 1);
- addDeltaFile(conf, t, p, 202L, 202L, 1);
- addDeltaFile(conf, t, p, 203L, 203L, 1);
- addDeltaFile(conf, t, p, 204L, 204L, 1);
- addDeltaFile(conf, t, p, 205L, 205L, 1);
- addDeltaFile(conf, t, p, 206L, 206L, 1);
- addDeltaFile(conf, t, p, 207L, 207L, 1);
- addDeltaFile(conf, t, p, 208L, 208L, 1);
- addDeltaFile(conf, t, p, 209L, 209L, 1);
- addDeltaFile(conf, t, p, 210L, 210L, 1);
- addDeltaFile(conf, t, p, 211L, 211L, 1);
+ addBaseFile(t, p, 200L, 200);
+ addDeltaFile(t, p, 201L, 201L, 1);
+ addDeltaFile(t, p, 202L, 202L, 1);
+ addDeltaFile(t, p, 203L, 203L, 1);
+ addDeltaFile(t, p, 204L, 204L, 1);
+ addDeltaFile(t, p, 205L, 205L, 1);
+ addDeltaFile(t, p, 206L, 206L, 1);
+ addDeltaFile(t, p, 207L, 207L, 1);
+ addDeltaFile(t, p, 208L, 208L, 1);
+ addDeltaFile(t, p, 209L, 209L, 1);
+ addDeltaFile(t, p, 210L, 210L, 1);
+ addDeltaFile(t, p, 211L, 211L, 1);
burnThroughTransactions(210);
@@ -456,7 +438,7 @@ public class TestInitiator extends Compa
LockResponse res = txnHandler.lock(req);
txnHandler.commitTxn(new CommitTxnRequest(txnid));
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -471,11 +453,9 @@ public class TestInitiator extends Compa
public void noCompactTableNotEnoughDeltas() throws Exception {
Table t = newTable("default", "nctned", false);
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 200L, 200);
- addDeltaFile(conf, t, null, 201L, 205L, 5);
- addDeltaFile(conf, t, null, 206L, 211L, 6);
+ addBaseFile(t, null, 200L, 200);
+ addDeltaFile(t, null, 201L, 205L, 5);
+ addDeltaFile(t, null, 206L, 211L, 6);
burnThroughTransactions(210);
@@ -489,7 +469,7 @@ public class TestInitiator extends Compa
LockResponse res = txnHandler.lock(req);
txnHandler.commitTxn(new CommitTxnRequest(txnid));
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(0, rsp.getCompactsSize());
@@ -499,20 +479,18 @@ public class TestInitiator extends Compa
public void chooseMajorOverMinorWhenBothValid() throws Exception {
Table t = newTable("default", "cmomwbv", false);
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 200L, 200);
- addDeltaFile(conf, t, null, 201L, 211L, 11);
- addDeltaFile(conf, t, null, 212L, 222L, 11);
- addDeltaFile(conf, t, null, 223L, 233L, 11);
- addDeltaFile(conf, t, null, 234L, 244L, 11);
- addDeltaFile(conf, t, null, 245L, 255L, 11);
- addDeltaFile(conf, t, null, 256L, 266L, 11);
- addDeltaFile(conf, t, null, 267L, 277L, 11);
- addDeltaFile(conf, t, null, 278L, 288L, 11);
- addDeltaFile(conf, t, null, 289L, 299L, 11);
- addDeltaFile(conf, t, null, 300L, 310L, 11);
- addDeltaFile(conf, t, null, 311L, 321L, 11);
+ addBaseFile(t, null, 200L, 200);
+ addDeltaFile(t, null, 201L, 211L, 11);
+ addDeltaFile(t, null, 212L, 222L, 11);
+ addDeltaFile(t, null, 223L, 233L, 11);
+ addDeltaFile(t, null, 234L, 244L, 11);
+ addDeltaFile(t, null, 245L, 255L, 11);
+ addDeltaFile(t, null, 256L, 266L, 11);
+ addDeltaFile(t, null, 267L, 277L, 11);
+ addDeltaFile(t, null, 278L, 288L, 11);
+ addDeltaFile(t, null, 289L, 299L, 11);
+ addDeltaFile(t, null, 300L, 310L, 11);
+ addDeltaFile(t, null, 311L, 321L, 11);
burnThroughTransactions(320);
@@ -526,7 +504,7 @@ public class TestInitiator extends Compa
LockResponse res = txnHandler.lock(req);
txnHandler.commitTxn(new CommitTxnRequest(txnid));
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -541,19 +519,17 @@ public class TestInitiator extends Compa
Table t = newTable("default", "ednb", true);
Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();
-
- addDeltaFile(conf, t, p, 1L, 201L, 200);
- addDeltaFile(conf, t, p, 202L, 202L, 1);
- addDeltaFile(conf, t, p, 203L, 203L, 1);
- addDeltaFile(conf, t, p, 204L, 204L, 1);
- addDeltaFile(conf, t, p, 205L, 205L, 1);
- addDeltaFile(conf, t, p, 206L, 206L, 1);
- addDeltaFile(conf, t, p, 207L, 207L, 1);
- addDeltaFile(conf, t, p, 208L, 208L, 1);
- addDeltaFile(conf, t, p, 209L, 209L, 1);
- addDeltaFile(conf, t, p, 210L, 210L, 1);
- addDeltaFile(conf, t, p, 211L, 211L, 1);
+ addDeltaFile(t, p, 1L, 201L, 200);
+ addDeltaFile(t, p, 202L, 202L, 1);
+ addDeltaFile(t, p, 203L, 203L, 1);
+ addDeltaFile(t, p, 204L, 204L, 1);
+ addDeltaFile(t, p, 205L, 205L, 1);
+ addDeltaFile(t, p, 206L, 206L, 1);
+ addDeltaFile(t, p, 207L, 207L, 1);
+ addDeltaFile(t, p, 208L, 208L, 1);
+ addDeltaFile(t, p, 209L, 209L, 1);
+ addDeltaFile(t, p, 210L, 210L, 1);
+ addDeltaFile(t, p, 211L, 211L, 1);
burnThroughTransactions(210);
@@ -568,7 +544,7 @@ public class TestInitiator extends Compa
LockResponse res = txnHandler.lock(req);
txnHandler.commitTxn(new CommitTxnRequest(txnid));
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -584,11 +560,9 @@ public class TestInitiator extends Compa
Table t = newTable("default", "ttospgocr", true);
Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
burnThroughTransactions(23);
@@ -614,7 +588,7 @@ public class TestInitiator extends Compa
res = txnHandler.lock(req);
txnHandler.commitTxn(new CommitTxnRequest(txnid));
- startInitiator(conf);
+ startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -626,9 +600,4 @@ public class TestInitiator extends Compa
}
// TODO test compactions with legacy file types
-
- @Before
- public void setUpTxnDb() throws Exception {
- TxnDbUtil.setConfValues(new HiveConf());
- }
}
Modified: hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java (original)
+++ hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java Mon Oct 13 21:32:54 2014
@@ -48,7 +48,7 @@ public class TestWorker extends Compacto
public void nothing() throws Exception {
// Test that the whole things works when there's nothing in the queue. This is just a
// survival test.
- startWorker(new HiveConf());
+ startWorker();
}
@Test
@@ -205,19 +205,17 @@ public class TestWorker extends Compacto
Table t = newTable("default", "st", false, new HashMap<String, String>(), sortCols);
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
- addDeltaFile(conf, t, null, 21L, 24L, 4);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ addDeltaFile(t, null, 21L, 24L, 4);
burnThroughTransactions(25);
CompactionRequest rqst = new CompactionRequest("default", "st", CompactionType.MINOR);
txnHandler.compact(rqst);
- startWorker(new HiveConf());
+ startWorker();
// There should still be four directories in the location.
FileSystem fs = FileSystem.get(conf);
@@ -232,12 +230,11 @@ public class TestWorker extends Compacto
Table t = newTable("default", "sp", true, new HashMap<String, String>(), sortCols);
Partition p = newPartition(t, "today", sortCols);
- HiveConf conf = new HiveConf();
- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
- addDeltaFile(conf, t, p, 21L, 24L, 4);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addDeltaFile(t, p, 21L, 24L, 4);
burnThroughTransactions(25);
@@ -245,7 +242,7 @@ public class TestWorker extends Compacto
rqst.setPartitionname("ds=today");
txnHandler.compact(rqst);
- startWorker(new HiveConf());
+ startWorker();
// There should still be four directories in the location.
FileSystem fs = FileSystem.get(conf);
@@ -258,18 +255,16 @@ public class TestWorker extends Compacto
LOG.debug("Starting minorTableWithBase");
Table t = newTable("default", "mtwb", false);
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
burnThroughTransactions(25);
CompactionRequest rqst = new CompactionRequest("default", "mtwb", CompactionType.MINOR);
txnHandler.compact(rqst);
- startWorker(conf);
+ startWorker();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -304,11 +299,10 @@ public class TestWorker extends Compacto
public void minorPartitionWithBase() throws Exception {
Table t = newTable("default", "mpwb", true);
Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();
- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
burnThroughTransactions(25);
@@ -316,7 +310,7 @@ public class TestWorker extends Compacto
rqst.setPartitionname("ds=today");
txnHandler.compact(rqst);
- startWorker(new HiveConf());
+ startWorker();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -351,17 +345,15 @@ public class TestWorker extends Compacto
LOG.debug("Starting minorTableWithBase");
Table t = newTable("default", "mtnb", false);
- HiveConf conf = new HiveConf();
-
- addDeltaFile(conf, t, null, 1L, 2L, 2);
- addDeltaFile(conf, t, null, 3L, 4L, 2);
+ addDeltaFile(t, null, 1L, 2L, 2);
+ addDeltaFile(t, null, 3L, 4L, 2);
burnThroughTransactions(5);
CompactionRequest rqst = new CompactionRequest("default", "mtnb", CompactionType.MINOR);
txnHandler.compact(rqst);
- startWorker(new HiveConf());
+ startWorker();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -396,18 +388,16 @@ public class TestWorker extends Compacto
LOG.debug("Starting majorTableWithBase");
Table t = newTable("default", "matwb", false);
- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
burnThroughTransactions(25);
CompactionRequest rqst = new CompactionRequest("default", "matwb", CompactionType.MAJOR);
txnHandler.compact(rqst);
- startWorker(new HiveConf());
+ startWorker();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -442,11 +432,10 @@ public class TestWorker extends Compacto
LOG.debug("Starting majorPartitionWithBase");
Table t = newTable("default", "mapwb", true);
Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();
- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
burnThroughTransactions(25);
@@ -454,7 +443,7 @@ public class TestWorker extends Compacto
rqst.setPartitionname("ds=today");
txnHandler.compact(rqst);
- startWorker(new HiveConf());
+ startWorker();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -489,17 +478,15 @@ public class TestWorker extends Compacto
LOG.debug("Starting majorTableNoBase");
Table t = newTable("default", "matnb", false);
- HiveConf conf = new HiveConf();
-
- addDeltaFile(conf, t, null, 1L, 2L, 2);
- addDeltaFile(conf, t, null, 3L, 4L, 2);
+ addDeltaFile(t, null, 1L, 2L, 2);
+ addDeltaFile(t, null, 3L, 4L, 2);
burnThroughTransactions(5);
CompactionRequest rqst = new CompactionRequest("default", "matnb", CompactionType.MAJOR);
txnHandler.compact(rqst);
- startWorker(new HiveConf());
+ startWorker();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -534,18 +521,16 @@ public class TestWorker extends Compacto
LOG.debug("Starting majorTableLegacy");
Table t = newTable("default", "matl", false);
- HiveConf conf = new HiveConf();
-
- addLegacyFile(conf, t, null, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addLegacyFile(t, null, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
burnThroughTransactions(25);
CompactionRequest rqst = new CompactionRequest("default", "matl", CompactionType.MAJOR);
txnHandler.compact(rqst);
- startWorker(new HiveConf());
+ startWorker();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -580,18 +565,16 @@ public class TestWorker extends Compacto
LOG.debug("Starting minorTableLegacy");
Table t = newTable("default", "mtl", false);
- HiveConf conf = new HiveConf();
-
- addLegacyFile(conf, t, null, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addLegacyFile(t, null, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
burnThroughTransactions(25);
CompactionRequest rqst = new CompactionRequest("default", "mtl", CompactionType.MINOR);
txnHandler.compact(rqst);
- startWorker(new HiveConf());
+ startWorker();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -622,11 +605,10 @@ public class TestWorker extends Compacto
public void majorPartitionWithBaseMissingBuckets() throws Exception {
Table t = newTable("default", "mapwbmb", true);
Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();
- addBaseFile(conf, t, p, 20L, 20, 2, false);
- addDeltaFile(conf, t, p, 21L, 22L, 2, 2, false);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 20L, 20, 2, false);
+ addDeltaFile(t, p, 21L, 22L, 2, 2, false);
+ addDeltaFile(t, p, 23L, 24L, 2);
burnThroughTransactions(25);
@@ -634,7 +616,7 @@ public class TestWorker extends Compacto
rqst.setPartitionname("ds=today");
txnHandler.compact(rqst);
- startWorker(new HiveConf());
+ startWorker();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -669,9 +651,4 @@ public class TestWorker extends Compacto
}
Assert.assertTrue(sawNewBase);
}
-
- @Before
- public void setUpTxnDb() throws Exception {
- TxnDbUtil.setConfValues(new HiveConf());
- }
}