You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/04/11 00:33:47 UTC

svn commit: r1586489 - in /hive/branches/branch-0.13: ./ metastore/src/java/org/apache/hadoop/hive/metastore/ metastore/src/java/org/apache/hadoop/hive/metastore/txn/ metastore/src/test/org/apache/hadoop/hive/metastore/txn/ ql/ ql/src/java/org/apache/h...

Author: omalley
Date: Thu Apr 10 22:33:46 2014
New Revision: 1586489

URL: http://svn.apache.org/r1586489
Log:
HIVE-6319. Add compactor for ACID tables. (Alan Gates via omalley)

Added:
    hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
      - copied unchanged from r1586488, hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/txn/
      - copied from r1586488, hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/
    hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/txn/
      - copied from r1586488, hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/
Modified:
    hive/branches/branch-0.13/   (props changed)
    hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    hive/branches/branch-0.13/ql/pom.xml
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java

Propchange: hive/branches/branch-0.13/
------------------------------------------------------------------------------
  Merged /hive/trunk:r1586488

Modified: hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Thu Apr 10 22:33:46 2014
@@ -42,6 +42,10 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Timer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
 import org.apache.commons.cli.OptionBuilder;
@@ -5077,7 +5081,12 @@ public class HiveMetaStore extends Thrif
         }
       });
 
-      startMetaStore(cli.port, ShimLoader.getHadoopThriftAuthBridge(), conf);
+      Lock startLock = new ReentrantLock();
+      Condition startCondition = startLock.newCondition();
+      MetaStoreThread.BooleanPointer startedServing = new MetaStoreThread.BooleanPointer();
+      startMetaStoreThreads(conf, startLock, startCondition, startedServing);
+      startMetaStore(cli.port, ShimLoader.getHadoopThriftAuthBridge(), conf, startLock,
+          startCondition, startedServing);
     } catch (Throwable t) {
       // Catch the exception, log it and rethrow it.
       HMSHandler.LOG
@@ -5095,7 +5104,19 @@ public class HiveMetaStore extends Thrif
    */
   public static void startMetaStore(int port, HadoopThriftAuthBridge bridge)
       throws Throwable {
-    startMetaStore(port, bridge, new HiveConf(HMSHandler.class));
+    startMetaStore(port, bridge, new HiveConf(HMSHandler.class), null, null, null);
+  }
+
+  /**
+   * Start the metastore store.
+   * @param port
+   * @param bridge
+   * @param conf
+   * @throws Throwable
+   */
+  public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
+                                    HiveConf conf) throws Throwable {
+    startMetaStore(port, bridge, conf, null, null, null);
   }
 
   /**
@@ -5108,7 +5129,8 @@ public class HiveMetaStore extends Thrif
    * @throws Throwable
    */
   public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
-      HiveConf conf) throws Throwable {
+      HiveConf conf, Lock startLock, Condition startCondition,
+      MetaStoreThread.BooleanPointer startedServing) throws Throwable {
     try {
 
       // Server will create new threads up to max as necessary. After an idle
@@ -5176,6 +5198,10 @@ public class HiveMetaStore extends Thrif
       HMSHandler.LOG.info("Options.maxWorkerThreads = "
           + maxWorkerThreads);
       HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive);
+
+      if (startLock != null) {
+        signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing);
+      }
       tServer.serve();
     } catch (Throwable x) {
       x.printStackTrace();
@@ -5183,4 +5209,119 @@ public class HiveMetaStore extends Thrif
       throw x;
     }
   }
+
+  private static void signalOtherThreadsToStart(final TServer server, final Lock startLock,
+                                                final Condition startCondition,
+                                                final MetaStoreThread.BooleanPointer startedServing) {
+    // A simple thread to wait until the server has started and then signal the other threads to
+    // begin
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        do {
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            LOG.warn("Signalling thread was interuppted: " + e.getMessage());
+          }
+        } while (!server.isServing());
+        startLock.lock();
+        try {
+          startedServing.boolVal = true;
+          startCondition.signalAll();
+        } finally {
+          startLock.unlock();
+        }
+      }
+    };
+    t.start();
+  }
+
+  /**
+   * Start threads outside of the thrift service, such as the compactor threads.
+   * @param conf Hive configuration object
+   */
+  private static void startMetaStoreThreads(final HiveConf conf, final Lock startLock,
+                                            final Condition startCondition, final
+                                            MetaStoreThread.BooleanPointer startedServing) {
+    // A thread is spun up to start these other threads.  That's because we can't start them
+    // until after the TServer has started, but once TServer.serve is called we aren't given back
+    // control.
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        // This is a massive hack.  The compactor threads have to access packages in ql (such as
+        // AcidInputFormat).  ql depends on metastore so we can't directly access those.  To deal
+        // with this the compactor thread classes have been put in ql and they are instantiated here
+        // dyanmically.  This is not ideal but it avoids a massive refactoring of Hive packages.
+        //
+        // Wrap the start of the threads in a catch Throwable loop so that any failures
+        // don't doom the rest of the metastore.
+        startLock.lock();
+        try {
+          // Per the javadocs on Condition, do not depend on the condition alone as a start gate
+          // since spurious wake ups are possible.
+          while (!startedServing.boolVal) startCondition.await();
+          startCompactorInitiator(conf);
+          startCompactorWorkers(conf);
+          startCompactorCleaner(conf);
+        } catch (Throwable e) {
+          LOG.error("Failure when starting the compactor, compactions may not happen, " +
+              StringUtils.stringifyException(e));
+        } finally {
+          startLock.unlock();
+        }
+      }
+    };
+
+    t.start();
+  }
+
+  private static void startCompactorInitiator(HiveConf conf) throws Exception {
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
+      MetaStoreThread initiator =
+          instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Initiator");
+      initializeAndStartThread(initiator, conf);
+    }
+  }
+
+  private static void startCompactorWorkers(HiveConf conf) throws Exception {
+    int numWorkers = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_THREADS);
+    for (int i = 0; i < numWorkers; i++) {
+      MetaStoreThread worker =
+          instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Worker");
+      initializeAndStartThread(worker, conf);
+    }
+  }
+
+  private static void startCompactorCleaner(HiveConf conf) throws Exception {
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
+      MetaStoreThread cleaner =
+          instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner");
+      initializeAndStartThread(cleaner, conf);
+    }
+  }
+
+  private static MetaStoreThread instantiateThread(String classname) throws Exception {
+    Class c = Class.forName(classname);
+    Object o = c.newInstance();
+    if (MetaStoreThread.class.isAssignableFrom(o.getClass())) {
+      return (MetaStoreThread)o;
+    } else {
+      String s = classname + " is not an instance of MetaStoreThread.";
+      LOG.error(s);
+      throw new IOException(s);
+    }
+  }
+
+  private static int nextThreadId = 1000000;
+
+  private static void initializeAndStartThread(MetaStoreThread thread, HiveConf conf) throws
+      MetaException {
+    LOG.info("Starting metastore thread of type " + thread.getClass().getName());
+    thread.setHiveConf(conf);
+    thread.setThreadId(nextThreadId++);
+    thread.init(new MetaStoreThread.BooleanPointer());
+    thread.start();
+  }
 }

Modified: hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original)
+++ hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Thu Apr 10 22:33:46 2014
@@ -51,13 +51,14 @@ public class CompactionTxnHandler extend
    * @return list of CompactionInfo structs.  These will not have id, type,
    * or runAs set since these are only potential compactions not actual ones.
    */
-  public List<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
+  public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
     Connection dbConn = getDbConn();
-    List<CompactionInfo> response = new ArrayList<CompactionInfo>();
+    Set<CompactionInfo> response = new HashSet<CompactionInfo>();
     try {
       Statement stmt = dbConn.createStatement();
       // Check for completed transactions
-      String s = "select ctc_database, ctc_table, ctc_partition from COMPLETED_TXN_COMPONENTS";
+      String s = "select distinct ctc_database, ctc_table, " +
+          "ctc_partition from COMPLETED_TXN_COMPONENTS";
       LOG.debug("Going to execute query <" + s + ">");
       ResultSet rs = stmt.executeQuery(s);
       while (rs.next()) {

Modified: hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java (original)
+++ hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java Thu Apr 10 22:33:46 2014
@@ -30,6 +30,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import static junit.framework.Assert.*;
 
@@ -317,7 +318,7 @@ public class TestCompactionTxnHandler {
     txnHandler.commitTxn(new CommitTxnRequest(txnid));
     assertEquals(0, txnHandler.numLocksInLockTable());
 
-    List<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
+    Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
     assertEquals(2, potentials.size());
     boolean sawMyTable = false, sawYourTable = false;
     for (CompactionInfo ci : potentials) {

Modified: hive/branches/branch-0.13/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/pom.xml?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/pom.xml (original)
+++ hive/branches/branch-0.13/ql/pom.xml Thu Apr 10 22:33:46 2014
@@ -352,6 +352,13 @@
         </dependency>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-common</artifactId>
+          <version>${hadoop-23.version}</version>
+          <optional>true</optional>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-hdfs</artifactId>
           <version>${hadoop-23.version}</version>
           <optional>true</optional>

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Thu Apr 10 22:33:46 2014
@@ -51,18 +51,29 @@ public class AcidUtils {
   public static final String DELTA_PREFIX = "delta_";
   public static final String BUCKET_PREFIX = "bucket_";
 
-  private static final String BUCKET_DIGITS = "%05d";
-  private static final String DELTA_DIGITS = "%07d";
+  public static final String BUCKET_DIGITS = "%05d";
+  public static final String DELTA_DIGITS = "%07d";
 
   private static final Pattern ORIGINAL_PATTERN =
       Pattern.compile("[0-9]+_[0-9]+");
 
+  public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
+  public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
+
   public static final PathFilter hiddenFileFilter = new PathFilter(){
     public boolean accept(Path p){
       String name = p.getName();
       return !name.startsWith("_") && !name.startsWith(".");
     }
   };
+
+  public static final PathFilter bucketFileFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith(BUCKET_PREFIX);
+    }
+  };
+
   private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
 
   /**

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java Thu Apr 10 22:33:46 2014
@@ -118,12 +118,16 @@ public class RecordIdentifier implements
 
   @Override
   public void write(DataOutput dataOutput) throws IOException {
-    throw new UnsupportedOperationException("Can't write RecordIdentifier");
+    dataOutput.writeLong(transactionId);
+    dataOutput.writeInt(bucketId);
+    dataOutput.writeLong(rowId);
   }
 
   @Override
   public void readFields(DataInput dataInput) throws IOException {
-    throw new UnsupportedOperationException("Can't read RecordIdentifier");
+    transactionId = dataInput.readLong();
+    bucketId = dataInput.readInt();
+    rowId = dataInput.readLong();
   }
 
   @Override