You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/06/02 07:28:40 UTC
[hive] branch master updated: HIVE-23555: Cancel compaction jobs
when hive.compactor.worker.timeout is reached (Peter Vary reviewed by Karen
Coppage and Laszlo Pinter)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 4b67087 HIVE-23555: Cancel compaction jobs when hive.compactor.worker.timeout is reached (Peter Vary reviewed by Karen Coppage and Laszlo Pinter)
4b67087 is described below
commit 4b670877c280b37c5776046f66d766079489b2a8
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Tue Jun 2 09:27:47 2020 +0200
HIVE-23555: Cancel compaction jobs when hive.compactor.worker.timeout is reached (Peter Vary reviewed by Karen Coppage and Laszlo Pinter)
---
.../hive/hcatalog/streaming/TestStreaming.java | 3 +-
.../hive/ql/txn/compactor/CompactorTestUtil.java | 57 +--
.../hive/ql/txn/compactor/TestCompactor.java | 4 +-
.../hadoop/hive/ql/stats/StatsUpdaterThread.java | 3 +-
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 11 +-
.../hive/ql/txn/compactor/CompactorThread.java | 12 +-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 4 +-
.../ql/txn/compactor/MetaStoreCompactorThread.java | 4 +-
.../ql/txn/compactor/RemoteCompactorThread.java | 6 +-
.../hadoop/hive/ql/txn/compactor/Worker.java | 458 ++++++++++++---------
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 2 +-
.../hadoop/hive/ql/TxnCommandsBaseForTests.java | 3 +-
.../hive/ql/stats/TestStatsUpdaterThread.java | 2 +-
.../hive/ql/txn/compactor/CompactorTest.java | 17 +-
.../hadoop/hive/ql/txn/compactor/TestWorker.java | 88 ++++
.../hadoop/hive/metastore/HiveMetaStore.java | 2 +-
.../hadoop/hive/metastore/MetaStoreThread.java | 9 +-
.../org/apache/hive/streaming/TestStreaming.java | 3 +-
18 files changed, 377 insertions(+), 311 deletions(-)
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 569de70..3db56a0 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -433,8 +433,7 @@ public class TestStreaming {
Worker t = new Worker();
t.setThreadId((int) t.getId());
t.setConf(hiveConf);
- AtomicBoolean looped = new AtomicBoolean();
- t.init(stop, looped);
+ t.init(stop);
t.run();
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
index e70d878..999ac7e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
@@ -118,7 +118,7 @@ class CompactorTestUtil {
Worker t = new Worker();
t.setThreadId((int) t.getId());
t.setConf(hiveConf);
- t.init(new AtomicBoolean(true), new AtomicBoolean());
+ t.init(new AtomicBoolean(true));
if (partNames.length == 0) {
txnHandler.compact(new CompactionRequest(dbName, tblName, compactionType));
t.run();
@@ -139,67 +139,14 @@ class CompactorTestUtil {
*/
static void runCleaner(HiveConf hConf) throws Exception {
HiveConf hiveConf = new HiveConf(hConf);
- AtomicBoolean stop = new AtomicBoolean(true);
Cleaner t = new Cleaner();
t.setThreadId((int) t.getId());
t.setConf(hiveConf);
- AtomicBoolean looped = new AtomicBoolean();
- t.init(stop, looped);
+ t.init(new AtomicBoolean(true));
t.run();
}
/**
- * Trigger compaction initiator.
- * @param hConf hive configuration
- * @param isQueryBased run compaction as query based
- * @throws Exception if initiator cannot be started.
- */
- static void runInitiator(HiveConf hConf, boolean isQueryBased) throws Exception {
- HiveConf hiveConf = new HiveConf(hConf);
- hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, isQueryBased);
- AtomicBoolean stop = new AtomicBoolean(true);
- Initiator t = new Initiator();
- t.setThreadId((int) t.getId());
- t.setConf(hiveConf);
- AtomicBoolean looped = new AtomicBoolean();
- t.init(stop, looped);
- t.run();
- }
-
- /**
- * Trigger compaction worker.
- * @param hConf hive configuration
- * @param isQueryBased run compaction as query based
- * @throws Exception if worker cannot be started.
- */
- static void runWorker(HiveConf hConf, boolean isQueryBased) throws Exception {
- HiveConf hiveConf = new HiveConf(hConf);
- hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, isQueryBased);
- AtomicBoolean stop = new AtomicBoolean(true);
- Worker t = new Worker();
- t.setThreadId((int) t.getId());
- t.setConf(hiveConf);
- AtomicBoolean looped = new AtomicBoolean();
- t.init(stop, looped);
- t.run();
- }
-
- /**
- * Execute Hive CLI statement.
- * @param cmd arbitrary statement to execute
- * @param driver execution driver
- * @throws Exception failed to execute statement
- */
- void executeStatementOnDriver(String cmd, IDriver driver) throws Exception {
- LOG.debug("Executing: " + cmd);
- try {
- driver.run(cmd);
- } catch (CommandProcessorException e) {
- throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + e);
- }
- }
-
- /**
* Execute Hive CLI statement and get back result.
* @param cmd arbitrary statement to execute
* @param driver execution driver
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 32fe535..9e19105 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1120,7 +1120,7 @@ public class TestCompactor {
Worker t = new Worker();
t.setThreadId((int) t.getId());
t.setConf(conf);
- t.init(new AtomicBoolean(true), new AtomicBoolean());
+ t.init(new AtomicBoolean(true));
if (partNames.length == 0) {
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
t.run();
@@ -1372,7 +1372,7 @@ public class TestCompactor {
Worker t = new Worker();
t.setThreadId((int) t.getId());
t.setConf(conf);
- t.init(new AtomicBoolean(true), new AtomicBoolean());
+ t.init(new AtomicBoolean(true));
CompactionRequest Cr = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
Cr.setProperties(tblProperties);
txnHandler.compact(Cr);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
index ecaad50..7d6e122 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
@@ -130,9 +130,8 @@ public class StatsUpdaterThread extends Thread implements MetaStoreThread {
}
@Override
- public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException {
+ public void init(AtomicBoolean stop) throws MetaException {
this.stop = stop;
- this.looped = looped;
setPriority(MIN_PRIORITY);
setDaemon(true);
String user = "anonymous";
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 5fa3d9a..2a15913 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -68,8 +68,8 @@ public class Cleaner extends MetaStoreCompactorThread {
private ReplChangeManager replChangeManager;
@Override
- public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
- super.init(stop, looped);
+ public void init(AtomicBoolean stop) throws Exception {
+ super.init(stop);
replChangeManager = ReplChangeManager.getInstance(conf);
}
@@ -81,10 +81,6 @@ public class Cleaner extends MetaStoreCompactorThread {
}
do {
- // This is solely for testing. It checks if the test has set the looped value to false,
- // and if so remembers that and then sets it to true at the end. We have to check here
- // first to make sure we go through a complete iteration of the loop before resetting it.
- boolean setLooped = !looped.get();
TxnStore.MutexAPI.LockHandle handle = null;
long startedAt = -1;
// Make sure nothing escapes this run method and kills the metastore at large,
@@ -105,9 +101,6 @@ public class Cleaner extends MetaStoreCompactorThread {
handle.releaseLocks();
}
}
- if (setLooped) {
- looped.set(true);
- }
// Now, go back to bed until it's time to do this again
long elapsedTime = System.currentTimeMillis() - startedAt;
if (elapsedTime >= cleanerCheckInterval || stop.get()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index b378d40..fb23c2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -55,7 +56,6 @@ public abstract class CompactorThread extends Thread implements Configurable {
protected HiveConf conf;
protected AtomicBoolean stop;
- protected AtomicBoolean looped;
protected int threadId;
@@ -78,11 +78,10 @@ public abstract class CompactorThread extends Thread implements Configurable {
return conf;
}
- public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
+ public void init(AtomicBoolean stop) throws Exception {
setPriority(MIN_PRIORITY);
setDaemon(true); // this means the process will exit without waiting for this thread
this.stop = stop;
- this.looped = looped;
}
/**
@@ -120,11 +119,12 @@ public abstract class CompactorThread extends Thread implements Configurable {
return null;
}
} catch (Exception e) {
- LOG.error("Unable to find partition " + ci.getFullPartitionName() + ", " + e.getMessage());
+ LOG.error("Unable to find partition " + ci.getFullPartitionName(), e);
throw e;
}
if (parts.size() != 1) {
- LOG.error(ci.getFullPartitionName() + " does not refer to a single partition. " + parts);
+ LOG.error(ci.getFullPartitionName() + " does not refer to a single partition. " +
+ Arrays.toString(parts.toArray()));
throw new MetaException("Too many partitions for : " + ci.getFullPartitionName());
}
return parts.get(0);
@@ -218,7 +218,7 @@ public abstract class CompactorThread extends Thread implements Configurable {
LOG.info("Starting compactor thread of type " + thread.getClass().getName());
thread.setConf(conf);
thread.setThreadId(nextThreadId.incrementAndGet());
- thread.init(new AtomicBoolean(), new AtomicBoolean());
+ thread.init(new AtomicBoolean());
thread.start();
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index fa2ede3..7913295 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -229,8 +229,8 @@ public class Initiator extends MetaStoreCompactorThread {
}
@Override
- public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
- super.init(stop, looped);
+ public void init(AtomicBoolean stop) throws Exception {
+ super.init(stop);
checkInterval = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
compactionExecutor = Executors.newFixedThreadPool(conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE));
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index aa258b3..5d7d810 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -51,8 +51,8 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
}
@Override
- public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
- super.init(stop, looped);
+ public void init(AtomicBoolean stop) throws Exception {
+ super.init(stop);
// Get our own instance of the transaction handler
txnHandler = TxnUtils.getTxnStore(conf);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
index 4235184..a4dfdda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
@@ -42,8 +42,8 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa
public class RemoteCompactorThread extends CompactorThread {
protected IMetaStoreClient msc;
- public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
- super.init(stop, looped);
+ public void init(AtomicBoolean stop) throws Exception {
+ super.init(stop);
this.msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
}
@@ -51,7 +51,7 @@ public class RemoteCompactorThread extends CompactorThread {
try {
return msc.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName);
} catch (TException e) {
- LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage());
+ LOG.error("Unable to find table " + ci.getFullTableName(), e);
throw new MetaException(e.toString());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 8180adc..a67db8e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
@@ -32,7 +33,6 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hive.common.util.Ref;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.net.InetAddress;
@@ -56,7 +55,12 @@ import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -71,7 +75,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
private static final int NOT_SET = -1;
private String workerName;
- private JobConf mrJob; // the MR job for compaction
/**
* Get the hostname that this worker is run on. Made static and public so that other classes
@@ -86,207 +89,55 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
throw new RuntimeException(e);
}
}
-//todo: this doesn;t check if compaction is already running (even though Initiator does but we
-// don't go through Initiator for user initiated compactions)
+
+ // TODO: this doesn't check if compaction is already running (even though Initiator does but we
+ // don't go through Initiator for user initiated compactions)
@Override
public void run() {
LOG.info("Starting Worker thread");
boolean computeStats = conf.getBoolVar(HiveConf.ConfVars.HIVE_MR_COMPACTOR_GATHER_STATS);
- do {
- boolean launchedJob = false;
- // Make sure nothing escapes this run method and kills the metastore at large,
- // so wrap it in a big catch Throwable statement.
- CompactionHeartbeater heartbeater = null;
- CompactionInfo ci = null;
- long compactorTxnId = NOT_SET;
- try {
- if (msc == null) {
- msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+ long timeout = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS);
+ boolean launchedJob;
+ ExecutorService executor = getTimeoutHandlingExecutor();
+ try {
+ do {
+ Future<Boolean> singleRun = executor.submit(() -> findNextCompactionAndExecute(computeStats));
+ try {
+ launchedJob = singleRun.get(timeout, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException te) {
+ LOG.info("Timeout during executing compaction", te);
+ // Cancel the job, and recreate the Executor as well, so we can be sure that we have an available thread
+ // even if we can not interrupt the task somehow. (Trade possible resource hogging for compactor stability)
+ singleRun.cancel(true);
+ executor.shutdownNow();
+ executor = getTimeoutHandlingExecutor();
+ launchedJob = true;
+ } catch (ExecutionException e) {
+ LOG.info("Exception during executing compaction", e);
+ launchedJob = true;
+ } catch (InterruptedException ie) {
+ // Do not do anything - stop should be set anyway
+ launchedJob = true;
}
- ci = CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(workerName));
- LOG.debug("Processing compaction request " + ci);
- if (ci == null && !stop.get()) {
+ // If we didn't try to launch a job it either means there was no work to do or we got
+ // here as the result of a communication failure with the DB. Either way we want to wait
+ // a bit before we restart the loop.
+ if (!launchedJob && !stop.get()) {
try {
Thread.sleep(SLEEP_TIME);
- continue;
} catch (InterruptedException e) {
- LOG.warn("Worker thread sleep interrupted " + e.getMessage());
- continue;
- }
- }
-
- // Find the table we will be working with.
- Table t1 = null;
- try {
- t1 = resolveTable(ci);
- if (t1 == null) {
- LOG.info("Unable to find table " + ci.getFullTableName() +
- ", assuming it was dropped and moving on.");
- msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
- continue;
- }
- } catch (MetaException e) {
- msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
- continue;
- }
- // This chicanery is to get around the fact that the table needs to be final in order to
- // go into the doAs below.
- final Table t = t1;
-
- // Find the partition we will be working with, if there is one.
- Partition p = null;
- try {
- p = resolvePartition(ci);
- if (p == null && ci.partName != null) {
- LOG.info("Unable to find partition " + ci.getFullPartitionName() +
- ", assuming it was dropped and moving on.");
- msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
- continue;
- }
- } catch (Exception e) {
- msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
- continue;
- }
-
- // Find the appropriate storage descriptor
- final StorageDescriptor sd = resolveStorageDescriptor(t, p);
-
- // Check that the table or partition isn't sorted, as we don't yet support that.
- if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
- LOG.error("Attempt to compact sorted table "+ci.getFullTableName()+", which is not yet supported!");
- msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
- continue;
- }
- String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName());
- if (ci.runAs == null) {
- ci.runAs = findUserToRunAs(sd.getLocation(), t);
- }
- /**
- * we cannot have Worker use HiveTxnManager (which is on ThreadLocal) since
- * then the Driver would already have the an open txn but then this txn would have
- * multiple statements in it (for query based compactor) which is not supported (and since
- * this case some of the statements are DDL, even in the future will not be allowed in a
- * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */
- compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
-
- heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf);
- heartbeater.start();
-
- ValidTxnList validTxnList = msc.getValidTxns(compactorTxnId);
- //with this ValidWriteIdList is capped at whatever HWM validTxnList has
- final ValidCompactorWriteIdList tblValidWriteIds =
- TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds(
- Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0));
- LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString());
- conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
-
- ci.highestWriteId = tblValidWriteIds.getHighWatermark();
- //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about
- //it until after any data written by it are physically removed
- msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), compactorTxnId);
- final StringBuilder jobName = new StringBuilder(workerName);
- jobName.append("-compactor-");
- jobName.append(ci.getFullPartitionName());
-
- // Don't start compaction or cleaning if not necessary
- AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf,
- tblValidWriteIds, Ref.from(false), true, null, false);
- if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
- if (needsCleaning(dir, sd)) {
- msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
- } else {
- // do nothing
- msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
- }
- continue;
- }
-
- LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " +
- JavaUtils.txnIdToString(compactorTxnId) + " with compute stats set to " + computeStats);
- final StatsUpdater su = computeStats ? StatsUpdater.init(ci, msc.findColumnsWithStats(
- CompactionInfo.compactionInfoToStruct(ci)), conf,
- runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()) : null;
- final CompactorMR mr = new CompactorMR();
- launchedJob = true;
- try {
- if (runJobAsSelf(ci.runAs)) {
- mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc, dir);
- } else {
- UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
- UserGroupInformation.getLoginUser());
- final Partition fp = p;
- final CompactionInfo fci = ci;
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, fci, su, msc, dir);
- return null;
- }
- });
- try {
- FileSystem.closeAllForUGI(ugi);
- } catch (IOException exception) {
- LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " +
- ci.getFullPartitionName(), exception);
- }
- }
- heartbeater.cancel();
- msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
- if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
- mrJob = mr.getMrJob();
- }
- } catch (Throwable e) {
- LOG.error("Caught exception while trying to compact " + ci +
- ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e));
- ci.errorMessage = e.getMessage();
- msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
- msc.abortTxns(Collections.singletonList(compactorTxnId));
- compactorTxnId = NOT_SET;
- }
- } catch (TException | IOException t) {
- LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " +
- StringUtils.stringifyException(t));
- try {
- if (msc != null && ci != null) {
- ci.errorMessage = t.getMessage();
- msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
- compactorTxnId = NOT_SET;
- }
- } catch (TException e) {
- LOG.error("Caught an exception while trying to mark compaction {} as failed: {}", ci, e);
- } finally {
- if (msc != null) {
- msc.close();
- msc = null;
}
}
- try {
- Thread.sleep(SLEEP_TIME);
- } catch (InterruptedException e) {
- LOG.error("Interrupted while sleeping to instantiate metastore client");
- }
- } catch (Throwable t) {
- LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " +
- StringUtils.stringifyException(t));
- compactorTxnId = NOT_SET;
- } finally {
- commitTxnIfSet(compactorTxnId);
- if (heartbeater != null) {
- heartbeater.cancel();
- }
+ } while (!stop.get());
+ } finally {
+ if (executor != null) {
+ executor.shutdownNow();
}
-
- // If we didn't try to launch a job it either means there was no work to do or we got
- // here as the result of a communication failure with the DB. Either way we want to wait
- // a bit before we restart the loop.
- if (!launchedJob && !stop.get()) {
- try {
- Thread.sleep(SLEEP_TIME);
- } catch (InterruptedException e) {
- }
+ if (msc != null) {
+ msc.close();
}
- } while (!stop.get());
+ }
}
private void commitTxnIfSet(long compactorTxnId) {
@@ -297,15 +148,14 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
}
} catch (TException e) {
LOG.error(
- "Caught an exception while committing compaction in worker " + workerName + ", "
- + StringUtils.stringifyException(e));
+ "Caught an exception while committing compaction in worker " + workerName, e);
}
}
}
@Override
- public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
- super.init(stop, looped);
+ public void init(AtomicBoolean stop) throws Exception {
+ super.init(stop);
StringBuilder name = new StringBuilder(hostname());
name.append("-");
@@ -314,10 +164,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
setName(name.toString());
}
- public JobConf getMrJob() {
- return mrJob;
- }
-
static final class StatsUpdater {
static final private Logger LOG = LoggerFactory.getLogger(StatsUpdater.class);
@@ -516,4 +362,218 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
}
return needsJustCleaning;
}
+
+ /**
+ * Creates a single threaded executor used for handling timeouts.
+ * The thread settings are inherited from the current thread.
+ * @return Single threaded executor service to be used for timeout handling
+ */
+ private ExecutorService getTimeoutHandlingExecutor() {
+ return Executors.newSingleThreadExecutor((r) -> {
+ Thread masterThread = Thread.currentThread();
+ Thread t = new Thread(masterThread.getThreadGroup(), r, masterThread.getName() + "_timeout_executor");
+ t.setDaemon(masterThread.isDaemon());
+ t.setPriority(masterThread.getPriority());
+ return t;
+ });
+ }
+
+ /**
+ * Finds the next compaction and executes it. The main thread might interrupt the execution of this method
+ * in case of timeout.
+ * @param computeStats If true then for MR compaction the stats are regenerated
+ * @return Returns true, if there was compaction in the queue, and we started working on it.
+ * @throws InterruptedException is thrown when the process is interrupted because of timeout for example
+ */
+ @VisibleForTesting
+ protected Boolean findNextCompactionAndExecute(boolean computeStats) throws InterruptedException {
+ // Make sure nothing escapes this run method and kills the metastore at large,
+ // so wrap it in a big catch Throwable statement.
+ CompactionHeartbeater heartbeater = null;
+ CompactionInfo ci = null;
+ long compactorTxnId = NOT_SET;
+ try {
+ if (msc == null) {
+ msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
+ }
+ ci = CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(workerName));
+ LOG.debug("Processing compaction request " + ci);
+
+ if (ci == null) {
+ return false;
+ }
+
+ checkInterrupt();
+
+ // Find the table we will be working with.
+ Table t1;
+ try {
+ t1 = resolveTable(ci);
+ if (t1 == null) {
+ LOG.info("Unable to find table " + ci.getFullTableName() +
+ ", assuming it was dropped and moving on.");
+ msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+ return false;
+ }
+ } catch (MetaException e) {
+ msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+ return false;
+ }
+
+ checkInterrupt();
+
+ // This chicanery is to get around the fact that the table needs to be final in order to
+ // go into the doAs below.
+ final Table t = t1;
+ String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName());
+
+ // Find the partition we will be working with, if there is one.
+ Partition p;
+ try {
+ p = resolvePartition(ci);
+ if (p == null && ci.partName != null) {
+ LOG.info("Unable to find partition " + ci.getFullPartitionName() +
+ ", assuming it was dropped and moving on.");
+ msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+ return false;
+ }
+ } catch (Exception e) {
+ msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+ return false;
+ }
+
+ checkInterrupt();
+
+ // Find the appropriate storage descriptor
+ final StorageDescriptor sd = resolveStorageDescriptor(t, p);
+
+ // Check that the table or partition isn't sorted, as we don't yet support that.
+ if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
+ LOG.error("Attempt to compact sorted table " + ci.getFullTableName() + ", which is not yet supported!");
+ msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+ return false;
+ }
+
+ if (ci.runAs == null) {
+ ci.runAs = findUserToRunAs(sd.getLocation(), t);
+ }
+
+ checkInterrupt();
+
+ /**
+ * we cannot have Worker use HiveTxnManager (which is on ThreadLocal) since
+ * then the Driver would already have the an open txn but then this txn would have
+ * multiple statements in it (for query based compactor) which is not supported (and since
+ * this case some of the statements are DDL, even in the future will not be allowed in a
+ * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */
+ compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
+
+ heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf);
+ heartbeater.start();
+
+ ValidTxnList validTxnList = msc.getValidTxns(compactorTxnId);
+ //with this ValidWriteIdList is capped at whatever HWM validTxnList has
+ final ValidCompactorWriteIdList tblValidWriteIds =
+ TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds(
+ Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0));
+ LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString());
+ conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+
+ ci.highestWriteId = tblValidWriteIds.getHighWatermark();
+ //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about
+ //it until after any data written by it are physically removed
+ msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), compactorTxnId);
+
+ checkInterrupt();
+
+ final StringBuilder jobName = new StringBuilder(workerName);
+ jobName.append("-compactor-");
+ jobName.append(ci.getFullPartitionName());
+
+ // Don't start compaction or cleaning if not necessary
+ AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf,
+ tblValidWriteIds, Ref.from(false), true, null, false);
+ if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
+ if (needsCleaning(dir, sd)) {
+ msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
+ } else {
+ // do nothing
+ msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+ }
+ return false;
+ }
+
+ checkInterrupt();
+
+ LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " +
+ JavaUtils.txnIdToString(compactorTxnId) + " with compute stats set to " + computeStats);
+ final StatsUpdater su = computeStats ? StatsUpdater.init(ci, msc.findColumnsWithStats(
+ CompactionInfo.compactionInfoToStruct(ci)), conf,
+ runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()) : null;
+ final CompactorMR mr = new CompactorMR();
+ try {
+ if (runJobAsSelf(ci.runAs)) {
+ mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc, dir);
+ } else {
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
+ UserGroupInformation.getLoginUser());
+ final Partition fp = p;
+ final CompactionInfo fci = ci;
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, fci, su, msc, dir);
+ return null;
+ }
+ });
+ try {
+ FileSystem.closeAllForUGI(ugi);
+ } catch (IOException exception) {
+ LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " +
+ ci.getFullPartitionName(), exception);
+ }
+ }
+ heartbeater.cancel();
+ msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
+ } catch (Throwable e) {
+ LOG.error("Caught exception while trying to compact " + ci +
+ ". Marking failed to avoid repeated failures", e);
+ ci.errorMessage = e.getMessage();
+ msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
+ msc.abortTxns(Collections.singletonList(compactorTxnId));
+ compactorTxnId = NOT_SET;
+ }
+ } catch (TException | IOException t) {
+ LOG.error("Caught an exception in the main loop of compactor worker " + workerName, t);
+ try {
+ if (msc != null && ci != null) {
+ ci.errorMessage = t.getMessage();
+ msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
+ compactorTxnId = NOT_SET;
+ }
+ } catch (TException e) {
+ LOG.error("Caught an exception while trying to mark compaction {} as failed: {}", ci, e);
+ } finally {
+ if (msc != null) {
+ msc.close();
+ msc = null;
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error("Caught an exception in the main loop of compactor worker " + workerName, t);
+ compactorTxnId = NOT_SET;
+ } finally {
+ commitTxnIfSet(compactorTxnId);
+ if (heartbeater != null) {
+ heartbeater.cancel();
+ }
+ }
+ return true;
+ }
+
+ private void checkInterrupt() throws InterruptedException {
+ if (Thread.interrupted()) {
+ throw new InterruptedException("Compaction execution is interrupted");
+ }
+ }
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 366282a..337f469 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -1041,7 +1041,7 @@ public class TestTxnCommands2 {
Initiator init = new Initiator();
init.setThreadId((int)init.getId());
init.setConf(hiveConf);
- init.init(stop, new AtomicBoolean());
+ init.init(stop);
init.run();
int numAttemptedCompactions = 1;
checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler));
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 3ff68a3..91752e8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -197,8 +197,7 @@ public abstract class TxnCommandsBaseForTests {
}
t.setThreadId((int) t.getId());
t.setConf(hiveConf);
- AtomicBoolean looped = new AtomicBoolean();
- t.init(stop, looped);
+ t.init(stop);
t.run();
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
index 84827d1..afe6070 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
@@ -722,7 +722,7 @@ public class TestStatsUpdaterThread {
private StatsUpdaterThread createUpdater() throws MetaException {
StatsUpdaterThread su = new StatsUpdaterThread();
su.setConf(hiveConf);
- su.init(new AtomicBoolean(false), null);
+ su.init(new AtomicBoolean(false));
return su;
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 9a9ab53..84a23b2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -124,10 +124,6 @@ public abstract class CompactorTest {
startThread('c', true);
}
- protected void startCleaner(AtomicBoolean looped) throws Exception {
- startThread('c', false, looped);
- }
-
protected Table newTable(String dbName, String tableName, boolean partitioned) throws TException {
return newTable(dbName, tableName, partitioned, new HashMap<String, String>(), null, false);
}
@@ -138,10 +134,6 @@ public abstract class CompactorTest {
}
- protected Table newTempTable(String tableName) throws TException {
- return newTable("default", tableName, false, null, null, true);
- }
-
protected Table newTable(String dbName, String tableName, boolean partitioned,
Map<String, String> parameters, List<Order> sortCols,
boolean isTemporary)
@@ -298,13 +290,8 @@ public abstract class CompactorTest {
// I can't do this with @Before because I want to be able to control when the thread starts
private void startThread(char type, boolean stopAfterOne) throws Exception {
- startThread(type, stopAfterOne, new AtomicBoolean());
- }
-
- private void startThread(char type, boolean stopAfterOne, AtomicBoolean looped)
- throws Exception {
TxnDbUtil.setConfValues(conf);
- CompactorThread t = null;
+ CompactorThread t;
switch (type) {
case 'i': t = new Initiator(); break;
case 'w': t = new Worker(); break;
@@ -314,7 +301,7 @@ public abstract class CompactorTest {
t.setThreadId((int) t.getId());
t.setConf(conf);
stop.set(stopAfterOne);
- t.init(stop, looped);
+ t.init(stop);
if (stopAfterOne) t.run();
else t.start();
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 443f982..73eb341 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import it.unimi.dsi.fastutil.booleans.AbstractBooleanBidirectionalIterator;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -59,6 +60,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Tests for the worker thread and its MR jobs.
@@ -1067,8 +1073,90 @@ public class TestWorker extends CompactorTest {
Assert.assertEquals(TxnState.ABORTED, openTxns.get(0).getState());
}
+ // With high timeout, but fast run we should finish without a problem
+ @Test(timeout=1000)
+ public void testNormalRun() throws Exception {
+ runTimeoutTest(10000, false, true);
+ }
+
+ // With low timeout, but slow run we should finish without a problem
+ @Test(timeout=1000)
+ public void testTimeoutWithInterrupt() throws Exception {
+ runTimeoutTest(1, true, false);
+ }
+
+ // With low timeout, but slow run we should finish without a problem, even if the interrupt is swallowed
+ @Test(timeout=1000)
+ public void testTimeoutWithoutInterrupt() throws Exception {
+ runTimeoutTest(1, true, true);
+ }
+
+ private void runTimeoutTest(long timeout, boolean runForever, boolean swallowInterrupt) throws Exception {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ HiveConf timeoutConf = new HiveConf(conf);
+ TimeoutWorker timeoutWorker;
+
+ timeoutConf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, timeout, TimeUnit.MILLISECONDS);
+ timeoutWorker = getTimeoutWorker(timeoutConf, executor, runForever, swallowInterrupt);
+
+ // Wait until at least 1st loop is finished
+ while (!timeoutWorker.looped.get()) {
+ Thread.sleep(10L);
+ }
+
+ timeoutWorker.looped.set(false);
+
+ // Wait until the 2nd loop is finished
+ while (!timeoutWorker.looped.get()) {
+ Thread.sleep(10L);
+ }
+
+ timeoutWorker.stop.set(true);
+ executor.shutdownNow();
+ }
+
+ private TimeoutWorker getTimeoutWorker(HiveConf conf, ExecutorService executor, boolean runForever,
+ boolean swallowInterrupt) throws Exception {
+ TimeoutWorker timeoutWorker = new TimeoutWorker(runForever, swallowInterrupt);
+ timeoutWorker.setThreadId((int)timeoutWorker.getId());
+ timeoutWorker.setConf(conf);
+ timeoutWorker.init(new AtomicBoolean(false));
+ executor.submit(() -> timeoutWorker.run());
+ return timeoutWorker;
+ }
+
@After
public void tearDown() throws Exception {
compactorTestCleanup();
}
+
+ private static final class TimeoutWorker extends Worker {
+ private boolean runForever;
+ private boolean swallowInterrupt;
+ private AtomicBoolean looped;
+
+ private TimeoutWorker(boolean runForever, boolean swallowInterrupt) {
+ this.runForever = runForever;
+ this.swallowInterrupt = swallowInterrupt;
+ this.looped = new AtomicBoolean(false);
+ }
+
+ protected Boolean findNextCompactionAndExecute(boolean computeStats) throws InterruptedException {
+ looped.set(true);
+ if (runForever) {
+ while (!stop.get()) {
+ try {
+ looped.set(true);
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException ie) {
+ if (!swallowInterrupt) {
+ throw ie;
+ }
+ Thread.sleep(Long.MAX_VALUE);
+ }
+ }
+ }
+ return true;
+ }
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index e20fdaf..6d68035 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -10548,7 +10548,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
LOG.info("Starting metastore thread of type " + thread.getClass().getName());
thread.setConf(conf);
thread.setThreadId(nextThreadId++);
- thread.init(new AtomicBoolean(), new AtomicBoolean());
+ thread.init(new AtomicBoolean());
thread.start();
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
index ea61552..57bf36b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
@@ -39,18 +39,13 @@ public interface MetaStoreThread extends Configurable {
* have been called.
* @param stop a flag to watch for when to stop. If this value is set to true,
* the thread will terminate the next time through its main loop.
- * @param looped a flag that is set to true everytime a thread goes through it's main loop.
- * This is purely for testing so that tests can assure themselves that the thread
- * has run through it's loop once. The test can set this value to false. The
- * thread should then assure that the loop has been gone completely through at
- * least once.
*/
// TODO: move these test parameters to more specific places... there's no need to have them here
- void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception;
+ void init(AtomicBoolean stop) throws Exception;
/**
* Run the thread in the background. This must not be called until
- * {@link MetaStoreThread#init(java.util.concurrent.atomic.AtomicBoolean,java.util.concurrent.atomic.AtomicBoolean)} has
+ * {@link MetaStoreThread#init(java.util.concurrent.atomic.AtomicBoolean)} has
* been called.
*/
void start();
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 6101caa..3a3b267 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -786,8 +786,7 @@ public class TestStreaming {
Worker t = new Worker();
t.setThreadId((int) t.getId());
t.setConf(hiveConf);
- AtomicBoolean looped = new AtomicBoolean();
- t.init(stop, looped);
+ t.init(stop);
t.run();
}