You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/04/11 00:33:47 UTC
svn commit: r1586489 - in /hive/branches/branch-0.13: ./
metastore/src/java/org/apache/hadoop/hive/metastore/
metastore/src/java/org/apache/hadoop/hive/metastore/txn/
metastore/src/test/org/apache/hadoop/hive/metastore/txn/ ql/
ql/src/java/org/apache/h...
Author: omalley
Date: Thu Apr 10 22:33:46 2014
New Revision: 1586489
URL: http://svn.apache.org/r1586489
Log:
HIVE-6319. Add compactor for ACID tables. (Alan Gates via omalley)
Added:
hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
- copied unchanged from r1586488, hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/txn/
- copied from r1586488, hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/txn/
- copied from r1586488, hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/
Modified:
hive/branches/branch-0.13/ (props changed)
hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
hive/branches/branch-0.13/ql/pom.xml
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
Propchange: hive/branches/branch-0.13/
------------------------------------------------------------------------------
Merged /hive/trunk:r1586488
Modified: hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Thu Apr 10 22:33:46 2014
@@ -42,6 +42,10 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.Timer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.commons.cli.OptionBuilder;
@@ -5077,7 +5081,12 @@ public class HiveMetaStore extends Thrif
}
});
- startMetaStore(cli.port, ShimLoader.getHadoopThriftAuthBridge(), conf);
+ Lock startLock = new ReentrantLock();
+ Condition startCondition = startLock.newCondition();
+ MetaStoreThread.BooleanPointer startedServing = new MetaStoreThread.BooleanPointer();
+ startMetaStoreThreads(conf, startLock, startCondition, startedServing);
+ startMetaStore(cli.port, ShimLoader.getHadoopThriftAuthBridge(), conf, startLock,
+ startCondition, startedServing);
} catch (Throwable t) {
// Catch the exception, log it and rethrow it.
HMSHandler.LOG
@@ -5095,7 +5104,19 @@ public class HiveMetaStore extends Thrif
*/
public static void startMetaStore(int port, HadoopThriftAuthBridge bridge)
throws Throwable {
- startMetaStore(port, bridge, new HiveConf(HMSHandler.class));
+ startMetaStore(port, bridge, new HiveConf(HMSHandler.class), null, null, null);
+ }
+
+ /**
+ * Start the metastore store.
+ * @param port
+ * @param bridge
+ * @param conf
+ * @throws Throwable
+ */
+ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
+ HiveConf conf) throws Throwable {
+ startMetaStore(port, bridge, conf, null, null, null);
}
/**
@@ -5108,7 +5129,8 @@ public class HiveMetaStore extends Thrif
* @throws Throwable
*/
public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
- HiveConf conf) throws Throwable {
+ HiveConf conf, Lock startLock, Condition startCondition,
+ MetaStoreThread.BooleanPointer startedServing) throws Throwable {
try {
// Server will create new threads up to max as necessary. After an idle
@@ -5176,6 +5198,10 @@ public class HiveMetaStore extends Thrif
HMSHandler.LOG.info("Options.maxWorkerThreads = "
+ maxWorkerThreads);
HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive);
+
+ if (startLock != null) {
+ signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing);
+ }
tServer.serve();
} catch (Throwable x) {
x.printStackTrace();
@@ -5183,4 +5209,119 @@ public class HiveMetaStore extends Thrif
throw x;
}
}
+
+ private static void signalOtherThreadsToStart(final TServer server, final Lock startLock,
+ final Condition startCondition,
+ final MetaStoreThread.BooleanPointer startedServing) {
+ // A simple thread to wait until the server has started and then signal the other threads to
+ // begin
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ do {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.warn("Signalling thread was interuppted: " + e.getMessage());
+ }
+ } while (!server.isServing());
+ startLock.lock();
+ try {
+ startedServing.boolVal = true;
+ startCondition.signalAll();
+ } finally {
+ startLock.unlock();
+ }
+ }
+ };
+ t.start();
+ }
+
+ /**
+ * Start threads outside of the thrift service, such as the compactor threads.
+ * @param conf Hive configuration object
+ */
+ private static void startMetaStoreThreads(final HiveConf conf, final Lock startLock,
+ final Condition startCondition, final
+ MetaStoreThread.BooleanPointer startedServing) {
+ // A thread is spun up to start these other threads. That's because we can't start them
+ // until after the TServer has started, but once TServer.serve is called we aren't given back
+ // control.
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ // This is a massive hack. The compactor threads have to access packages in ql (such as
+ // AcidInputFormat). ql depends on metastore so we can't directly access those. To deal
+ // with this the compactor thread classes have been put in ql and they are instantiated here
+ // dyanmically. This is not ideal but it avoids a massive refactoring of Hive packages.
+ //
+ // Wrap the start of the threads in a catch Throwable loop so that any failures
+ // don't doom the rest of the metastore.
+ startLock.lock();
+ try {
+ // Per the javadocs on Condition, do not depend on the condition alone as a start gate
+ // since spurious wake ups are possible.
+ while (!startedServing.boolVal) startCondition.await();
+ startCompactorInitiator(conf);
+ startCompactorWorkers(conf);
+ startCompactorCleaner(conf);
+ } catch (Throwable e) {
+ LOG.error("Failure when starting the compactor, compactions may not happen, " +
+ StringUtils.stringifyException(e));
+ } finally {
+ startLock.unlock();
+ }
+ }
+ };
+
+ t.start();
+ }
+
+ private static void startCompactorInitiator(HiveConf conf) throws Exception {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
+ MetaStoreThread initiator =
+ instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Initiator");
+ initializeAndStartThread(initiator, conf);
+ }
+ }
+
+ private static void startCompactorWorkers(HiveConf conf) throws Exception {
+ int numWorkers = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_THREADS);
+ for (int i = 0; i < numWorkers; i++) {
+ MetaStoreThread worker =
+ instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Worker");
+ initializeAndStartThread(worker, conf);
+ }
+ }
+
+ private static void startCompactorCleaner(HiveConf conf) throws Exception {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
+ MetaStoreThread cleaner =
+ instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner");
+ initializeAndStartThread(cleaner, conf);
+ }
+ }
+
+ private static MetaStoreThread instantiateThread(String classname) throws Exception {
+ Class c = Class.forName(classname);
+ Object o = c.newInstance();
+ if (MetaStoreThread.class.isAssignableFrom(o.getClass())) {
+ return (MetaStoreThread)o;
+ } else {
+ String s = classname + " is not an instance of MetaStoreThread.";
+ LOG.error(s);
+ throw new IOException(s);
+ }
+ }
+
+ private static int nextThreadId = 1000000;
+
+ private static void initializeAndStartThread(MetaStoreThread thread, HiveConf conf) throws
+ MetaException {
+ LOG.info("Starting metastore thread of type " + thread.getClass().getName());
+ thread.setHiveConf(conf);
+ thread.setThreadId(nextThreadId++);
+ thread.init(new MetaStoreThread.BooleanPointer());
+ thread.start();
+ }
}
Modified: hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original)
+++ hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Thu Apr 10 22:33:46 2014
@@ -51,13 +51,14 @@ public class CompactionTxnHandler extend
* @return list of CompactionInfo structs. These will not have id, type,
* or runAs set since these are only potential compactions not actual ones.
*/
- public List<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
+ public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
Connection dbConn = getDbConn();
- List<CompactionInfo> response = new ArrayList<CompactionInfo>();
+ Set<CompactionInfo> response = new HashSet<CompactionInfo>();
try {
Statement stmt = dbConn.createStatement();
// Check for completed transactions
- String s = "select ctc_database, ctc_table, ctc_partition from COMPLETED_TXN_COMPONENTS";
+ String s = "select distinct ctc_database, ctc_table, " +
+ "ctc_partition from COMPLETED_TXN_COMPONENTS";
LOG.debug("Going to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
while (rs.next()) {
Modified: hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java (original)
+++ hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java Thu Apr 10 22:33:46 2014
@@ -30,6 +30,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import static junit.framework.Assert.*;
@@ -317,7 +318,7 @@ public class TestCompactionTxnHandler {
txnHandler.commitTxn(new CommitTxnRequest(txnid));
assertEquals(0, txnHandler.numLocksInLockTable());
- List<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
+ Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
assertEquals(2, potentials.size());
boolean sawMyTable = false, sawYourTable = false;
for (CompactionInfo ci : potentials) {
Modified: hive/branches/branch-0.13/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/pom.xml?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/pom.xml (original)
+++ hive/branches/branch-0.13/ql/pom.xml Thu Apr 10 22:33:46 2014
@@ -352,6 +352,13 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>${hadoop-23.version}</version>
+ <optional>true</optional>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop-23.version}</version>
<optional>true</optional>
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Thu Apr 10 22:33:46 2014
@@ -51,18 +51,29 @@ public class AcidUtils {
public static final String DELTA_PREFIX = "delta_";
public static final String BUCKET_PREFIX = "bucket_";
- private static final String BUCKET_DIGITS = "%05d";
- private static final String DELTA_DIGITS = "%07d";
+ public static final String BUCKET_DIGITS = "%05d";
+ public static final String DELTA_DIGITS = "%07d";
private static final Pattern ORIGINAL_PATTERN =
Pattern.compile("[0-9]+_[0-9]+");
+ public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
+ public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
+
public static final PathFilter hiddenFileFilter = new PathFilter(){
public boolean accept(Path p){
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
};
+
+ public static final PathFilter bucketFileFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(BUCKET_PREFIX);
+ }
+ };
+
private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
/**
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java Thu Apr 10 22:33:46 2014
@@ -118,12 +118,16 @@ public class RecordIdentifier implements
@Override
public void write(DataOutput dataOutput) throws IOException {
- throw new UnsupportedOperationException("Can't write RecordIdentifier");
+ dataOutput.writeLong(transactionId);
+ dataOutput.writeInt(bucketId);
+ dataOutput.writeLong(rowId);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
- throw new UnsupportedOperationException("Can't read RecordIdentifier");
+ transactionId = dataInput.readLong();
+ bucketId = dataInput.readInt();
+ rowId = dataInput.readLong();
}
@Override