You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/04/11 00:32:21 UTC

svn commit: r1586488 [1/2] - in /hive/trunk: metastore/src/java/org/apache/hadoop/hive/metastore/ metastore/src/java/org/apache/hadoop/hive/metastore/txn/ metastore/src/test/org/apache/hadoop/hive/metastore/txn/ ql/ ql/src/java/org/apache/hadoop/hive/q...

Author: omalley
Date: Thu Apr 10 22:32:20 2014
New Revision: 1586488

URL: http://svn.apache.org/r1586488
Log:
HIVE-6319. Add compactor for ACID tables. (Alan Gates via omalley)

Added:
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
Modified:
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    hive/trunk/ql/pom.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1586488&r1=1586487&r2=1586488&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Thu Apr 10 22:32:20 2014
@@ -42,6 +42,10 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Timer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
 import org.apache.commons.cli.OptionBuilder;
@@ -5074,7 +5078,12 @@ public class HiveMetaStore extends Thrif
         }
       });
 
-      startMetaStore(cli.port, ShimLoader.getHadoopThriftAuthBridge(), conf);
+      Lock startLock = new ReentrantLock();
+      Condition startCondition = startLock.newCondition();
+      MetaStoreThread.BooleanPointer startedServing = new MetaStoreThread.BooleanPointer();
+      startMetaStoreThreads(conf, startLock, startCondition, startedServing);
+      startMetaStore(cli.port, ShimLoader.getHadoopThriftAuthBridge(), conf, startLock,
+          startCondition, startedServing);
     } catch (Throwable t) {
       // Catch the exception, log it and rethrow it.
       HMSHandler.LOG
@@ -5092,7 +5101,19 @@ public class HiveMetaStore extends Thrif
    */
   public static void startMetaStore(int port, HadoopThriftAuthBridge bridge)
       throws Throwable {
-    startMetaStore(port, bridge, new HiveConf(HMSHandler.class));
+    startMetaStore(port, bridge, new HiveConf(HMSHandler.class), null, null, null);
+  }
+
+  /**
+   * Start the metastore store.
+   * @param port
+   * @param bridge
+   * @param conf
+   * @throws Throwable
+   */
+  public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
+                                    HiveConf conf) throws Throwable {
+    startMetaStore(port, bridge, conf, null, null, null);
   }
 
   /**
@@ -5105,7 +5126,8 @@ public class HiveMetaStore extends Thrif
    * @throws Throwable
    */
   public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
-      HiveConf conf) throws Throwable {
+      HiveConf conf, Lock startLock, Condition startCondition,
+      MetaStoreThread.BooleanPointer startedServing) throws Throwable {
     try {
 
       // Server will create new threads up to max as necessary. After an idle
@@ -5173,6 +5195,10 @@ public class HiveMetaStore extends Thrif
       HMSHandler.LOG.info("Options.maxWorkerThreads = "
           + maxWorkerThreads);
       HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive);
+
+      if (startLock != null) {
+        signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing);
+      }
       tServer.serve();
     } catch (Throwable x) {
       x.printStackTrace();
@@ -5180,4 +5206,119 @@ public class HiveMetaStore extends Thrif
       throw x;
     }
   }
+
+  private static void signalOtherThreadsToStart(final TServer server, final Lock startLock,
+                                                final Condition startCondition,
+                                                final MetaStoreThread.BooleanPointer startedServing) {
+    // A simple thread to wait until the server has started and then signal the other threads to
+    // begin
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        do {
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            LOG.warn("Signalling thread was interuppted: " + e.getMessage());
+          }
+        } while (!server.isServing());
+        startLock.lock();
+        try {
+          startedServing.boolVal = true;
+          startCondition.signalAll();
+        } finally {
+          startLock.unlock();
+        }
+      }
+    };
+    t.start();
+  }
+
+  /**
+   * Start threads outside of the thrift service, such as the compactor threads.
+   * @param conf Hive configuration object
+   */
+  private static void startMetaStoreThreads(final HiveConf conf, final Lock startLock,
+                                            final Condition startCondition, final
+                                            MetaStoreThread.BooleanPointer startedServing) {
+    // A thread is spun up to start these other threads.  That's because we can't start them
+    // until after the TServer has started, but once TServer.serve is called we aren't given back
+    // control.
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        // This is a massive hack.  The compactor threads have to access packages in ql (such as
+        // AcidInputFormat).  ql depends on metastore so we can't directly access those.  To deal
+        // with this the compactor thread classes have been put in ql and they are instantiated here
+        // dyanmically.  This is not ideal but it avoids a massive refactoring of Hive packages.
+        //
+        // Wrap the start of the threads in a catch Throwable loop so that any failures
+        // don't doom the rest of the metastore.
+        startLock.lock();
+        try {
+          // Per the javadocs on Condition, do not depend on the condition alone as a start gate
+          // since spurious wake ups are possible.
+          while (!startedServing.boolVal) startCondition.await();
+          startCompactorInitiator(conf);
+          startCompactorWorkers(conf);
+          startCompactorCleaner(conf);
+        } catch (Throwable e) {
+          LOG.error("Failure when starting the compactor, compactions may not happen, " +
+              StringUtils.stringifyException(e));
+        } finally {
+          startLock.unlock();
+        }
+      }
+    };
+
+    t.start();
+  }
+
+  private static void startCompactorInitiator(HiveConf conf) throws Exception {
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
+      MetaStoreThread initiator =
+          instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Initiator");
+      initializeAndStartThread(initiator, conf);
+    }
+  }
+
+  private static void startCompactorWorkers(HiveConf conf) throws Exception {
+    int numWorkers = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_THREADS);
+    for (int i = 0; i < numWorkers; i++) {
+      MetaStoreThread worker =
+          instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Worker");
+      initializeAndStartThread(worker, conf);
+    }
+  }
+
+  private static void startCompactorCleaner(HiveConf conf) throws Exception {
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
+      MetaStoreThread cleaner =
+          instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner");
+      initializeAndStartThread(cleaner, conf);
+    }
+  }
+
+  private static MetaStoreThread instantiateThread(String classname) throws Exception {
+    Class c = Class.forName(classname);
+    Object o = c.newInstance();
+    if (MetaStoreThread.class.isAssignableFrom(o.getClass())) {
+      return (MetaStoreThread)o;
+    } else {
+      String s = classname + " is not an instance of MetaStoreThread.";
+      LOG.error(s);
+      throw new IOException(s);
+    }
+  }
+
+  private static int nextThreadId = 1000000;
+
+  private static void initializeAndStartThread(MetaStoreThread thread, HiveConf conf) throws
+      MetaException {
+    LOG.info("Starting metastore thread of type " + thread.getClass().getName());
+    thread.setHiveConf(conf);
+    thread.setThreadId(nextThreadId++);
+    thread.init(new MetaStoreThread.BooleanPointer());
+    thread.start();
+  }
 }

Added: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java (added)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+/**
+ * A thread that runs in the metastore, separate from the threads in the thrift service.
+ */
+public interface MetaStoreThread {
+
+  /**
+   * Set the Hive configuration for this thread.
+   * @param conf
+   */
+  void setHiveConf(HiveConf conf);
+
+  /**
+   * Set the id for this thread.
+   * @param threadId
+   */
+  void setThreadId(int threadId);
+
+  /**
+   * Initialize the thread.  This must not be called until after
+   * {@link #setHiveConf(org.apache.hadoop.hive.conf.HiveConf)} and  {@link #setThreadId(int)}
+   * have been called.
+   * @param stop a flag to watch for when to stop.  If this value is set to true,
+   *             the thread will terminate the next time through its main loop.
+   */
+  void init(BooleanPointer stop) throws MetaException;
+
+  /**
+   * Run the thread in the background.  This must not be called until
+   * {@link #init(org.apache.hadoop.hive.metastore.MetaStoreThread.BooleanPointer)} has
+   * been called.
+   */
+  void start();
+
+  class BooleanPointer {
+    public boolean boolVal;
+
+    public BooleanPointer() {
+      boolVal = false;
+    }
+  }
+
+}

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1586488&r1=1586487&r2=1586488&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Thu Apr 10 22:32:20 2014
@@ -51,13 +51,14 @@ public class CompactionTxnHandler extend
    * @return list of CompactionInfo structs.  These will not have id, type,
    * or runAs set since these are only potential compactions not actual ones.
    */
-  public List<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
+  public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
     Connection dbConn = getDbConn();
-    List<CompactionInfo> response = new ArrayList<CompactionInfo>();
+    Set<CompactionInfo> response = new HashSet<CompactionInfo>();
     try {
       Statement stmt = dbConn.createStatement();
       // Check for completed transactions
-      String s = "select ctc_database, ctc_table, ctc_partition from COMPLETED_TXN_COMPONENTS";
+      String s = "select distinct ctc_database, ctc_table, " +
+          "ctc_partition from COMPLETED_TXN_COMPONENTS";
       LOG.debug("Going to execute query <" + s + ">");
       ResultSet rs = stmt.executeQuery(s);
       while (rs.next()) {

Modified: hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java?rev=1586488&r1=1586487&r2=1586488&view=diff
==============================================================================
--- hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java (original)
+++ hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java Thu Apr 10 22:32:20 2014
@@ -30,6 +30,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import static junit.framework.Assert.*;
 
@@ -317,7 +318,7 @@ public class TestCompactionTxnHandler {
     txnHandler.commitTxn(new CommitTxnRequest(txnid));
     assertEquals(0, txnHandler.numLocksInLockTable());
 
-    List<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
+    Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
     assertEquals(2, potentials.size());
     boolean sawMyTable = false, sawYourTable = false;
     for (CompactionInfo ci : potentials) {

Modified: hive/trunk/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/ql/pom.xml?rev=1586488&r1=1586487&r2=1586488&view=diff
==============================================================================
--- hive/trunk/ql/pom.xml (original)
+++ hive/trunk/ql/pom.xml Thu Apr 10 22:32:20 2014
@@ -352,6 +352,13 @@
         </dependency>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-common</artifactId>
+          <version>${hadoop-23.version}</version>
+          <optional>true</optional>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-hdfs</artifactId>
           <version>${hadoop-23.version}</version>
           <optional>true</optional>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1586488&r1=1586487&r2=1586488&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Thu Apr 10 22:32:20 2014
@@ -51,18 +51,29 @@ public class AcidUtils {
   public static final String DELTA_PREFIX = "delta_";
   public static final String BUCKET_PREFIX = "bucket_";
 
-  private static final String BUCKET_DIGITS = "%05d";
-  private static final String DELTA_DIGITS = "%07d";
+  public static final String BUCKET_DIGITS = "%05d";
+  public static final String DELTA_DIGITS = "%07d";
 
   private static final Pattern ORIGINAL_PATTERN =
       Pattern.compile("[0-9]+_[0-9]+");
 
+  public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
+  public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
+
   public static final PathFilter hiddenFileFilter = new PathFilter(){
     public boolean accept(Path p){
       String name = p.getName();
       return !name.startsWith("_") && !name.startsWith(".");
     }
   };
+
+  public static final PathFilter bucketFileFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith(BUCKET_PREFIX);
+    }
+  };
+
   private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java?rev=1586488&r1=1586487&r2=1586488&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java Thu Apr 10 22:32:20 2014
@@ -118,12 +118,16 @@ public class RecordIdentifier implements
 
   @Override
   public void write(DataOutput dataOutput) throws IOException {
-    throw new UnsupportedOperationException("Can't write RecordIdentifier");
+    dataOutput.writeLong(transactionId);
+    dataOutput.writeInt(bucketId);
+    dataOutput.writeLong(rowId);
   }
 
   @Override
   public void readFields(DataInput dataInput) throws IOException {
-    throw new UnsupportedOperationException("Can't read RecordIdentifier");
+    transactionId = dataInput.readLong();
+    bucketId = dataInput.readInt();
+    rowId = dataInput.readLong();
   }
 
   @Override

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A class to clean directories after compactions.  This will run in a separate thread.
+ */
+public class Cleaner extends CompactorThread {
+  static final private String CLASS_NAME = Cleaner.class.getName();
+  static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  private long cleanerCheckInterval = 5000;
+
+  @Override
+  public void run() {
+    // Make sure nothing escapes this run method and kills the metastore at large,
+    // so wrap it in a big catch Throwable statement.
+    do {
+      try {
+        long startedAt = System.currentTimeMillis();
+
+        // Now look for new entries ready to be cleaned.
+        List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+        for (CompactionInfo ci : toClean) {
+          LockComponent comp = null;
+          comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, ci.dbname);
+          comp.setTablename(ci.tableName);
+          if (ci.partName != null)  comp.setPartitionname(ci.partName);
+          List<LockComponent> components = new ArrayList<LockComponent>(1);
+          components.add(comp);
+          LockRequest rqst = new LockRequest(components, System.getProperty("user.name"),
+              Worker.hostname());
+          LockResponse rsp = txnHandler.lockNoWait(rqst);
+          try {
+            if (rsp.getState() == LockState.ACQUIRED) {
+              clean(ci);
+            }
+          } finally {
+            if (rsp.getState() == LockState.ACQUIRED) {
+              txnHandler.unlock(new UnlockRequest(rsp.getLockid()));
+            }
+          }
+        }
+
+        // Now, go back to bed until it's time to do this again
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        if (elapsedTime >= cleanerCheckInterval || stop.boolVal)  continue;
+        else Thread.sleep(cleanerCheckInterval - elapsedTime);
+      } catch (Throwable t) {
+        LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+            StringUtils.stringifyException(t));
+      }
+    } while (!stop.boolVal);
+  }
+
+  private void clean(CompactionInfo ci) throws MetaException {
+    LOG.info("Starting cleaning for " + ci.getFullPartitionName());
+    try {
+      StorageDescriptor sd = resolveStorageDescriptor(resolveTable(ci), resolvePartition(ci));
+      final String location = sd.getLocation();
+
+      // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open
+      // transactions.  This assures that all deltas are treated as valid and all we return are
+      // obsolete files.
+      final ValidTxnList txnList = new ValidTxnListImpl();
+
+      if (runJobAsSelf(ci.runAs)) {
+        removeFiles(location, txnList);
+      } else {
+        LOG.info("Cleaning as user " + ci.runAs);
+        UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs,
+            UserGroupInformation.getLoginUser());
+        ugi.doAs(new PrivilegedExceptionAction<Object>() {
+          @Override
+          public Object run() throws Exception {
+            removeFiles(location, txnList);
+            return null;
+          }
+        });
+      }
+
+    } catch (Exception e) {
+      LOG.error("Caught exception when cleaning, unable to complete cleaning " +
+          StringUtils.stringifyException(e));
+    } finally {
+      // We need to clean this out one way or another.
+      txnHandler.markCleaned(ci);
+    }
+  }
+
+  private void removeFiles(String location, ValidTxnList txnList) throws IOException {
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, txnList);
+    List<FileStatus> obsoleteDirs = dir.getObsolete();
+    List<Path> filesToDelete = new ArrayList<Path>(obsoleteDirs.size());
+    for (FileStatus stat : obsoleteDirs) {
+      filesToDelete.add(stat.getPath());
+    }
+    if (filesToDelete.size() < 1) {
+      LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location +
+          ", that hardly seems right.");
+      return;
+    }
+    FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
+
+    for (Path dead : filesToDelete) {
+      LOG.debug("Doing to delete path " + dead.toString());
+      fs.delete(dead, true);
+    }
+  }
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,719 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.Matcher;
+
+/**
+ * Class to do compactions via an MR job.  This has to be in the ql package rather than metastore
+ * .compactions package with all of it's relatives because it needs access to the actual input
+ * and output formats, which are in ql.  ql depends on metastore and we can't have a circular
+ * dependency.
+ */
+public class CompactorMR {
+
+  static final private String CLASS_NAME = CompactorMR.class.getName();
+  static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  static final private String INPUT_FORMAT_CLASS_NAME = "hive.compactor.input.format.class.name";
+  static final private String OUTPUT_FORMAT_CLASS_NAME = "hive.compactor.output.format.class.name";
+  static final private String TMP_LOCATION = "hive.compactor.input.tmp.dir";
+  static final private String FINAL_LOCATION = "hive.compactor.input.dir";
+  static final private String MIN_TXN = "hive.compactor.txn.min";
+  static final private String MAX_TXN = "hive.compactor.txn.max";
+  static final private String IS_MAJOR = "hive.compactor.is.major";
+  static final private String IS_COMPRESSED = "hive.compactor.is.compressed";
+  static final private String TABLE_PROPS = "hive.compactor.table.props";
+  static final private String NUM_BUCKETS = "hive.compactor.num.buckets";
+  static final private String BASE_DIR = "hive.compactor.base.dir";
+  static final private String DELTA_DIRS = "hive.compactor.delta.dirs";
+  static final private String DIRS_TO_SEARCH = "hive.compactor.dirs.to.search";
+  static final private String TMPDIR = "_tmp";
+
+  public CompactorMR() {
+  }
+
+  /**
+   * Run a compactor job.
+   * @param conf Hive configuration file
+   * @param jobName name to run this job with
+   * @param t metastore table
+   * @param sd metastore storage descriptor
+   * @param txns list of valid transactions
+   * @param isMajor is this a major compaction?
+   * @throws java.io.IOException if the job fails
+   */
+  void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
+           ValidTxnList txns, boolean isMajor) throws IOException {
+    JobConf job = new JobConf(conf);
+    job.setJobName(jobName);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(NullWritable.class);
+    job.setJarByClass(CompactorMR.class);
+    LOG.debug("User jar set to " + job.getJar());
+    job.setMapperClass(CompactorMap.class);
+    job.setNumReduceTasks(0);
+    job.setInputFormat(CompactorInputFormat.class);
+    job.setOutputFormat(NullOutputFormat.class);
+    job.setOutputCommitter(CompactorOutputCommitter.class);
+
+    job.set(FINAL_LOCATION, sd.getLocation());
+    job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString());
+    job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat());
+    job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat());
+    job.setBoolean(IS_MAJOR, isMajor);
+    job.setBoolean(IS_COMPRESSED, sd.isCompressed());
+    job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString());
+    job.setInt(NUM_BUCKETS, sd.getBucketColsSize());
+    job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+    setColumnTypes(job, sd.getCols());
+
+    // Figure out and encode what files we need to read.  We do this here (rather than in
+    // getSplits below) because as part of this we discover our minimum and maximum transactions,
+    // and discovering that in getSplits is too late as we then have no way to pass it to our
+    // mapper.
+
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
+    StringableList dirsToSearch = new StringableList();
+    Path baseDir = null;
+    if (isMajor) {
+      // There may not be a base dir if the partition was empty before inserts or if this
+      // partition is just now being converted to ACID.
+      baseDir = dir.getBaseDirectory();
+      if (baseDir == null) {
+        List<FileStatus> originalFiles = dir.getOriginalFiles();
+        if (!(originalFiles == null) && !(originalFiles.size() == 0)) {
+          // There are original format files
+          for (FileStatus stat : originalFiles) {
+            dirsToSearch.add(stat.getPath());
+            LOG.debug("Adding original file " + stat.getPath().toString() + " to dirs to search");
+          }
+          // Set base to the location so that the input format reads the original files.
+          baseDir = new Path(sd.getLocation());
+        }
+      } else {
+        // add our base to the list of directories to search for files in.
+        LOG.debug("Adding base directory " + baseDir + " to dirs to search");
+        dirsToSearch.add(baseDir);
+      }
+    }
+
+    List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories();
+
+    if (parsedDeltas == null || parsedDeltas.size() == 0) {
+      // Seriously, no deltas?  Can't compact that.
+      LOG.error("No delta files found to compact in " + sd.getLocation());
+      return;
+    }
+
+    StringableList deltaDirs = new StringableList();
+    long minTxn = Long.MAX_VALUE;
+    long maxTxn = Long.MIN_VALUE;
+    for (AcidUtils.ParsedDelta delta : parsedDeltas) {
+      LOG.debug("Adding delta " + delta.getPath() + " to directories to search");
+      dirsToSearch.add(delta.getPath());
+      deltaDirs.add(delta.getPath());
+      minTxn = Math.min(minTxn, delta.getMinTransaction());
+      maxTxn = Math.max(maxTxn, delta.getMaxTransaction());
+    }
+
+    if (baseDir != null) job.set(BASE_DIR, baseDir.toString());
+    job.set(DELTA_DIRS, deltaDirs.toString());
+    job.set(DIRS_TO_SEARCH, dirsToSearch.toString());
+    job.setLong(MIN_TXN, minTxn);
+    job.setLong(MAX_TXN, maxTxn);
+    LOG.debug("Setting minimum transaction to " + minTxn);
+    LOG.debug("Setting maximume transaction to " + maxTxn);
+
+    JobClient.runJob(job).waitForCompletion();
+  }
+
+  /**
+   * Set the column names and types into the job conf for the input format
+   * to use.
+   * @param job the job to update
+   * @param cols the columns of the table
+   */
+  private void setColumnTypes(JobConf job, List<FieldSchema> cols) {
+    StringBuilder colNames = new StringBuilder();
+    StringBuilder colTypes = new StringBuilder();
+    boolean isFirst = true;
+    for(FieldSchema col: cols) {
+      if (isFirst) {
+        isFirst = false;
+      } else {
+        colNames.append(',');
+        colTypes.append(',');
+      }
+      colNames.append(col.getName());
+      colTypes.append(col.getType());
+    }
+    job.set(serdeConstants.LIST_COLUMNS, colNames.toString());
+    job.set(serdeConstants.LIST_COLUMN_TYPES, colTypes.toString());
+  }
+
+  static class CompactorInputSplit implements InputSplit {
+    private long length = 0;
+    private List<String> locations;
+    private int bucketNum;
+    private Path base;
+    private Path[] deltas;
+
+    public CompactorInputSplit() {
+    }
+
+    /**
+     *
+     * @param hadoopConf
+     * @param bucket bucket to be processed by this split
+     * @param files actual files this split should process.  It is assumed the caller has already
+     *              parsed out the files in base and deltas to populate this list.
+     * @param base directory of the base, or the partition/table location if the files are in old
+     *             style.  Can be null.
+     * @param deltas directories of the delta files.
+     * @throws IOException
+     */
+    CompactorInputSplit(Configuration hadoopConf, int bucket, List<Path> files, Path base,
+                               Path[] deltas)
+        throws IOException {
+      bucketNum = bucket;
+      this.base = base;
+      this.deltas = deltas;
+      locations = new ArrayList<String>();
+
+      for (Path path : files) {
+        FileSystem fs = path.getFileSystem(hadoopConf);
+        FileStatus stat = fs.getFileStatus(path);
+        length += stat.getLen();
+        BlockLocation[] locs = fs.getFileBlockLocations(stat, 0, length);
+        for (int i = 0; i < locs.length; i++) {
+          String[] hosts = locs[i].getHosts();
+          for (int j = 0; j < hosts.length; j++) {
+            locations.add(hosts[j]);
+          }
+        }
+      }
+    }
+
+    @Override
+    public long getLength() throws IOException {
+      return length;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+      return locations.toArray(new String[locations.size()]);
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+      dataOutput.writeLong(length);
+      dataOutput.writeInt(locations.size());
+      for (int i = 0; i < locations.size(); i++) {
+        dataOutput.writeInt(locations.get(i).length());
+        dataOutput.writeBytes(locations.get(i));
+      }
+      dataOutput.writeInt(bucketNum);
+      if (base == null) {
+        dataOutput.writeInt(0);
+      } else {
+        dataOutput.writeInt(base.toString().length());
+        dataOutput.writeBytes(base.toString());
+      }
+      dataOutput.writeInt(deltas.length);
+      for (int i = 0; i < deltas.length; i++) {
+        dataOutput.writeInt(deltas[i].toString().length());
+        dataOutput.writeBytes(deltas[i].toString());
+      }
+
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+      int len;
+      byte[] buf;
+
+      locations = new ArrayList<String>();
+      length = dataInput.readLong();
+      LOG.debug("Read length of " + length);
+      int numElements = dataInput.readInt();
+      LOG.debug("Read numElements of " + numElements);
+      for (int i = 0; i < numElements; i++) {
+        len = dataInput.readInt();
+        LOG.debug("Read file length of " + len);
+        buf = new byte[len];
+        dataInput.readFully(buf);
+        locations.add(new String(buf));
+      }
+      bucketNum = dataInput.readInt();
+      LOG.debug("Read bucket number of " + bucketNum);
+      len = dataInput.readInt();
+      LOG.debug("Read base path length of " + len);
+      if (len > 0) {
+        buf = new byte[len];
+        dataInput.readFully(buf);
+        base = new Path(new String(buf));
+      }
+      numElements = dataInput.readInt();
+      deltas = new Path[numElements];
+      for (int i = 0; i < numElements; i++) {
+        len = dataInput.readInt();
+        buf = new byte[len];
+        dataInput.readFully(buf);
+        deltas[i] = new Path(new String(buf));
+      }
+    }
+
+    public void set(CompactorInputSplit other) {
+      length = other.length;
+      locations = other.locations;
+      bucketNum = other.bucketNum;
+      base = other.base;
+      deltas = other.deltas;
+    }
+
+    int getBucket() {
+      return bucketNum;
+    }
+
+    Path getBaseDir() {
+      return base;
+    }
+
+    Path[] getDeltaDirs() {
+      return deltas;
+    }
+
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("CompactorInputSplit{base: ");
+      builder.append(base);
+      builder.append(", bucket: ");
+      builder.append(bucketNum);
+      builder.append(", length: ");
+      builder.append(length);
+      builder.append(", deltas: [");
+      for(int i=0; i < deltas.length; ++i) {
+        if (i != 0) {
+          builder.append(", ");
+        }
+        builder.append(deltas[i].getName());
+      }
+      builder.append("]}");
+      return builder.toString();
+    }
+  }
+
+  /**
+   * This input format returns its own input split as a value.  This is because our splits
+   * contain information needed to properly construct the writer.  Crazy, huh?
+   */
+  static class CompactorInputFormat implements InputFormat<NullWritable, CompactorInputSplit> {
+
+    @Override
+    public InputSplit[] getSplits(JobConf entries, int i) throws IOException {
+      Path baseDir = null;
+      if (entries.get(BASE_DIR) != null) baseDir = new Path(entries.get(BASE_DIR));
+      StringableList tmpDeltaDirs = new StringableList(entries.get(DELTA_DIRS));
+      Path[] deltaDirs = tmpDeltaDirs.toArray(new Path[tmpDeltaDirs.size()]);
+      StringableList dirsToSearch = new StringableList(entries.get(DIRS_TO_SEARCH));
+      Map<Integer, BucketTracker> splitToBucketMap = new HashMap<Integer, BucketTracker>();
+      for (Path dir : dirsToSearch) {
+        FileSystem fs = dir.getFileSystem(entries);
+
+        // If this is a base or delta directory, then we need to be looking for the bucket files.
+        // But if it's a legacy file then we need to add it directly.
+        if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) ||
+            dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) {
+          boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
+          FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter);
+          for (int j = 0; j < files.length; j++) {
+            // For each file, figure out which bucket it is.
+            Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(files[j].getPath().getName());
+            addFileToMap(matcher, files[j].getPath(), sawBase, splitToBucketMap);
+          }
+        } else {
+          // Legacy file, see if it's a bucket file
+          Matcher matcher = AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(dir.getName());
+          addFileToMap(matcher, dir, true, splitToBucketMap);
+        }
+      }
+      List<InputSplit> splits = new ArrayList<InputSplit>(splitToBucketMap.size());
+      for (Map.Entry<Integer, BucketTracker> e : splitToBucketMap.entrySet()) {
+        BucketTracker bt = e.getValue();
+        splits.add(new CompactorInputSplit(entries, e.getKey(), bt.buckets,
+            bt.sawBase ? baseDir : null, deltaDirs));
+      }
+
+      LOG.debug("Returning " + splits.size() + " splits");
+      return splits.toArray(new InputSplit[splits.size()]);
+    }
+
+    @Override
+    public RecordReader<NullWritable, CompactorInputSplit> getRecordReader(
+        InputSplit inputSplit,  JobConf entries, Reporter reporter) throws IOException {
+      return new CompactorRecordReader((CompactorInputSplit)inputSplit);
+    }
+
+    private void addFileToMap(Matcher matcher, Path file, boolean sawBase,
+                              Map<Integer, BucketTracker> splitToBucketMap) {
+      if (!matcher.find()) {
+        LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " +
+            file.toString());
+      }
+      int bucketNum = Integer.valueOf(matcher.group());
+      BucketTracker bt = splitToBucketMap.get(bucketNum);
+      if (bt == null) {
+        bt = new BucketTracker();
+        splitToBucketMap.put(bucketNum, bt);
+      }
+      LOG.debug("Adding " + file.toString() + " to list of files for splits");
+      bt.buckets.add(file);
+      bt.sawBase |= sawBase;
+    }
+
+    private static class BucketTracker {
+      BucketTracker() {
+        sawBase = false;
+        buckets = new ArrayList<Path>();
+      }
+
+      boolean sawBase;
+      List<Path> buckets;
+    }
+  }
+
+  static class CompactorRecordReader
+      implements RecordReader<NullWritable, CompactorInputSplit> {
+    private CompactorInputSplit split;
+
+    CompactorRecordReader(CompactorInputSplit split) {
+      this.split = split;
+    }
+
+    @Override
+    public boolean next(NullWritable key,
+                        CompactorInputSplit compactorInputSplit) throws IOException {
+      if (split != null) {
+        compactorInputSplit.set(split);
+        split = null;
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public NullWritable createKey() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public CompactorInputSplit createValue() {
+      return new CompactorInputSplit();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return 0;
+    }
+  }
+
+  static class CompactorMap<V extends Writable>
+      implements Mapper<NullWritable, CompactorInputSplit,  NullWritable,  NullWritable> {
+
+    JobConf jobConf;
+    FSRecordWriter writer;
+
+    @Override
+    public void map(NullWritable key, CompactorInputSplit split,
+                    OutputCollector<NullWritable, NullWritable> nullWritableVOutputCollector,
+                    Reporter reporter) throws IOException {
+      // This will only get called once, since CompactRecordReader only returns one record,
+      // the input split.
+      // Based on the split we're passed we go instantiate the real reader and then iterate on it
+      // until it finishes.
+      AcidInputFormat aif =
+          instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME));
+      ValidTxnList txnList =
+          new ValidTxnListImpl(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
+
+      AcidInputFormat.RawReader<V> reader =
+          aif.getRawReader(jobConf, jobConf.getBoolean(IS_MAJOR, false), split.getBucket(),
+              txnList, split.getBaseDir(), split.getDeltaDirs());
+      RecordIdentifier identifier = reader.createKey();
+      V value = reader.createValue();
+      getWriter(reporter, reader.getObjectInspector(), split.getBucket());
+      while (reader.next(identifier, value)) {
+        writer.write(value);
+        reporter.progress();
+      }
+    }
+
+    @Override
+    public void configure(JobConf entries) {
+      jobConf = entries;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (writer != null) {
+        writer.close(false);
+      }
+    }
+
+    private void getWriter(Reporter reporter, ObjectInspector inspector,
+                           int bucket) throws IOException {
+      if (writer == null) {
+        AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf);
+        options.inspector(inspector)
+            .writingBase(jobConf.getBoolean(IS_MAJOR, false))
+            .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
+            .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties())
+            .reporter(reporter)
+            .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
+            .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
+            .bucket(bucket);
+
+        // Instantiate the underlying output format
+        AcidOutputFormat<V> aof =
+            instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
+
+        writer = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options);
+      }
+    }
+
+  }
+
+  static class StringableMap extends HashMap<String, String> {
+
+    StringableMap(String s) {
+      String[] parts = s.split(":", 2);
+      // read that many chars
+      int numElements = Integer.valueOf(parts[0]);
+      s = parts[1];
+      for (int i = 0; i < numElements; i++) {
+        parts = s.split(":", 2);
+        int len = Integer.valueOf(parts[0]);
+        String key = null;
+        if (len > 0) key = parts[1].substring(0, len);
+        parts = parts[1].substring(len).split(":", 2);
+        len = Integer.valueOf(parts[0]);
+        String value = null;
+        if (len > 0) value = parts[1].substring(0, len);
+        s = parts[1].substring(len);
+        put(key, value);
+      }
+    }
+
+    StringableMap(Map<String, String> m) {
+      super(m);
+    }
+
+    @Override
+    public String toString() {
+      StringBuffer buf = new StringBuffer();
+      buf.append(size());
+      buf.append(':');
+      if (size() > 0) {
+        for (Map.Entry<String, String> entry : entrySet()) {
+          int length = (entry.getKey() == null) ? 0 : entry.getKey().length();
+          buf.append(entry.getKey() == null ? 0 : length);
+          buf.append(':');
+          if (length > 0) buf.append(entry.getKey());
+          length = (entry.getValue() == null) ? 0 : entry.getValue().length();
+          buf.append(length);
+          buf.append(':');
+          if (length > 0) buf.append(entry.getValue());
+        }
+      }
+      return buf.toString();
+    }
+
+    public Properties toProperties() {
+      Properties props = new Properties();
+      props.putAll(this);
+      return props;
+    }
+  }
+
+  static class StringableList extends ArrayList<Path> {
+    StringableList() {
+
+    }
+
+    StringableList(String s) {
+      String[] parts = s.split(":", 2);
+      // read that many chars
+      int numElements = Integer.valueOf(parts[0]);
+      s = parts[1];
+      for (int i = 0; i < numElements; i++) {
+        parts = s.split(":", 2);
+        int len = Integer.valueOf(parts[0]);
+        String val = parts[1].substring(0, len);
+        s = parts[1].substring(len);
+        add(new Path(val));
+      }
+    }
+
+    @Override
+    public String toString() {
+      StringBuffer buf = new StringBuffer();
+      buf.append(size());
+      buf.append(':');
+      if (size() > 0) {
+        for (Path p : this) {
+          buf.append(p.toString().length());
+          buf.append(':');
+          buf.append(p.toString());
+        }
+      }
+      return buf.toString();
+    }
+  }
+
+  private static <T> T instantiate(Class<T> classType, String classname) throws IOException {
+    T t = null;
+    try {
+      Class c = Class.forName(classname);
+      Object o = c.newInstance();
+      if (classType.isAssignableFrom(o.getClass())) {
+        t = (T)o;
+      } else {
+        String s = classname + " is not an instance of " + classType.getName();
+        LOG.error(s);
+        throw new IOException(s);
+      }
+    } catch (ClassNotFoundException e) {
+      LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e));
+      throw new IOException(e);
+    } catch (InstantiationException e) {
+      LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e));
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e));
+      throw new IOException(e);
+    }
+    return t;
+  }
+
+  static class CompactorOutputCommitter extends OutputCommitter {
+
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {
+
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+    }
+
+    @Override
+    public void commitJob(JobContext context) throws IOException {
+      Path tmpLocation = new Path(context.getJobConf().get(TMP_LOCATION));
+      Path finalLocation = new Path(context.getJobConf().get(FINAL_LOCATION));
+      FileSystem fs = tmpLocation.getFileSystem(context.getJobConf());
+      LOG.debug("Moving contents of " + tmpLocation.toString() + " to " +
+          finalLocation.toString());
+
+      FileStatus[] contents = fs.listStatus(tmpLocation);
+      for (int i = 0; i < contents.length; i++) {
+        Path newPath = new Path(finalLocation, contents[i].getPath().getName());
+        fs.rename(contents[i].getPath(), newPath);
+      }
+      fs.delete(tmpLocation, true);
+    }
+
+    @Override
+    public void abortJob(JobContext context, int status) throws IOException {
+      Path tmpLocation = new Path(context.getJobConf().get(TMP_LOCATION));
+      FileSystem fs = tmpLocation.getFileSystem(context.getJobConf());
+      LOG.debug("Removing " + tmpLocation.toString());
+      fs.delete(tmpLocation, true);
+    }
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.RawStoreProxy;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Superclass for all threads in the compactor.
+ */
+abstract class CompactorThread extends Thread implements MetaStoreThread {
+  static final private String CLASS_NAME = CompactorThread.class.getName();
+  static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected HiveConf conf;
+  protected CompactionTxnHandler txnHandler;
+  protected RawStore rs;
+  protected int threadId;
+  protected BooleanPointer stop;
+
+  @Override
+  public void setHiveConf(HiveConf conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void setThreadId(int threadId) {
+    this.threadId = threadId;
+
+  }
+
+  @Override
+  public void init(BooleanPointer stop) throws MetaException {
+    this.stop = stop;
+    setPriority(MIN_PRIORITY);
+    setDaemon(true); // this means the process will exit without waiting for this thread
+
+    // Get our own instance of the transaction handler
+    txnHandler = new CompactionTxnHandler(conf);
+
+    // Get our own connection to the database so we can get table and partition information.
+    rs = RawStoreProxy.getProxy(conf, conf,
+        conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), threadId);
+  }
+
+  /**
+   * Find the table being compacted
+   * @param ci compaction info returned from the compaction queue
+   * @return metastore table
+   * @throws org.apache.hadoop.hive.metastore.api.MetaException if the table cannot be found.
+   */
+  protected Table resolveTable(CompactionInfo ci) throws MetaException {
+    try {
+      return rs.getTable(ci.dbname, ci.tableName);
+    } catch (MetaException e) {
+      LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage());
+      throw e;
+    }
+  }
+
+  /**
+   * Get the partition being compacted.
+   * @param ci compaction info returned from the compaction queue
+   * @return metastore partition, or null if there is not partition in this compaction info
+   * @throws Exception if underlying calls throw, or if the partition name resolves to more than
+   * one partition.
+   */
+  protected Partition resolvePartition(CompactionInfo ci) throws Exception {
+    Partition p = null;
+    if (ci.partName != null) {
+      List<String> names = new ArrayList<String>(1);
+      names.add(ci.partName);
+      List<Partition> parts = null;
+      try {
+        parts = rs.getPartitionsByNames(ci.dbname, ci.tableName, names);
+      } catch (Exception e) {
+        LOG.error("Unable to find partition " + ci.getFullPartitionName() + ", " + e.getMessage());
+        throw e;
+      }
+      if (parts.size() != 1) {
+        LOG.error(ci.getFullPartitionName() + " does not refer to a single partition");
+        throw new MetaException("Too many partitions");
+      }
+      return parts.get(0);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get the storage descriptor for a compaction.
+   * @param t table from {@link #resolveTable(org.apache.hadoop.hive.metastore.txn.CompactionInfo)}
+   * @param p table from {@link #resolvePartition(org.apache.hadoop.hive.metastore.txn.CompactionInfo)}
+   * @return metastore storage descriptor.
+   */
+  protected StorageDescriptor resolveStorageDescriptor(Table t, Partition p) {
+    return (p == null) ? t.getSd() : p.getSd();
+  }
+
+  /**
+   * Determine which user to run an operation as, based on the owner of the directory to be
+   * compacted.  It is asserted that either the user running the hive metastore or the table
+   * owner must be able to stat the directory and determine the owner.
+   * @param location directory that will be read or written to.
+   * @param t metastore table object
+   * @return username of the owner of the location.
+   * @throws java.io.IOException if neither the hive metastore user nor the table owner can stat
+   * the location.
+   */
+  protected String findUserToRunAs(String location, Table t) throws IOException,
+      InterruptedException {
+    LOG.debug("Determining who to run the job as.");
+    final Path p = new Path(location);
+    final FileSystem fs = p.getFileSystem(conf);
+    try {
+      FileStatus stat = fs.getFileStatus(p);
+      LOG.debug("Running job as " + stat.getOwner());
+      return stat.getOwner();
+    } catch (AccessControlException e) {
+      // TODO not sure this is the right exception
+      LOG.debug("Unable to stat file as current user, trying as table owner");
+
+      // Now, try it as the table owner and see if we get better luck.
+      final List<String> wrapper = new ArrayList<String>(1);
+      UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
+          UserGroupInformation.getLoginUser());
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          FileStatus stat = fs.getFileStatus(p);
+          wrapper.add(stat.getOwner());
+          return null;
+        }
+      });
+
+      if (wrapper.size() == 1) {
+        LOG.debug("Running job as " + wrapper.get(0));
+        return wrapper.get(0);
+      }
+    }
+    LOG.error("Unable to stat file as either current user or table owner, giving up");
+    throw new IOException("Unable to stat file");
+  }
+
+  /**
+   * Determine whether to run this job as the current user or whether we need a doAs to switch
+   * users.
+   * @param owner of the directory we will be working in, as determined by
+   * {@link #findUserToRunAs(String, org.apache.hadoop.hive.metastore.api.Table)}
+   * @return true if the job should run as the current user, false if a doAs is needed.
+   */
+  protected boolean runJobAsSelf(String owner) {
+    return (owner.equals(System.getProperty("user.name")));
+  }
+
+  protected String tableName(Table t) {
+    return t.getDbName() + "." + t.getTableName();
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A class to initiate compactions.  This will run in a separate thread.
+ */
+public class Initiator extends CompactorThread {
+  static final private String CLASS_NAME = Initiator.class.getName();
+  static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+  static final private int threadId = 10000;
+
+  static final private String NO_COMPACTION = "NO_AUTO_COMPACTION";
+
+  private long checkInterval;
+
+  @Override
+  public void run() {
+    // Make sure nothing escapes this run method and kills the metastore at large,
+    // so wrap it in a big catch Throwable statement.
+    try {
+      recoverFailedCompactions(false);
+
+      int abortedThreashold = HiveConf.getIntVar(conf,
+          HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
+
+      // Make sure we run through the loop once before checking to stop as this makes testing
+      // much easier.  The stop value is only for testing anyway and not used when called from
+      // HiveMetaStore.
+      do {
+        long startedAt = System.currentTimeMillis();
+
+        // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop
+        // don't doom the entire thread.
+        try {
+          ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
+          ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns());
+          Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreashold);
+          LOG.debug("Found " + potentials.size() + " potential compactions, " +
+              "checking to see if we should compact any of them");
+          for (CompactionInfo ci : potentials) {
+            LOG.debug("Checking to see if we should compact " + ci.getFullPartitionName());
+            try {
+              Table t = resolveTable(ci);
+              // check if no compaction set for this table
+              if (t.getParameters().get(NO_COMPACTION) != null) {
+                LOG.info("Table " + tableName(t) + " marked " +  NO_COMPACTION +
+                    " so we will not compact it.");
+                continue;
+              }
+
+              // Check if we already have initiated or are working on a compaction for this partition
+              // or table.  If so, skip it.  If we are just waiting on cleaning we can still check,
+              // as it may be time to compact again even though we haven't cleaned.
+              if (lookForCurrentCompactions(currentCompactions, ci)) {
+                LOG.debug("Found currently initiated or working compaction for " +
+                    ci.getFullPartitionName() + " so we will not initiate another compaction");
+                continue;
+              }
+
+              // Figure out who we should run the file operations as
+              Partition p = resolvePartition(ci);
+              StorageDescriptor sd = resolveStorageDescriptor(t, p);
+              String runAs = findUserToRunAs(sd.getLocation(), t);
+
+              CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, runAs);
+              if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
+            } catch (Throwable t) {
+              LOG.error("Caught exception while trying to determine if we should compact " +
+                  ci.getFullPartitionName() + ".  Marking clean to avoid repeated failures, " +
+                  "" + StringUtils.stringifyException(t));
+              txnHandler.markCleaned(ci);
+            }
+          }
+
+          // Check for timed out remote workers.
+          recoverFailedCompactions(true);
+
+          // Clean anything from the txns table that has no components left in txn_components.
+          txnHandler.cleanEmptyAbortedTxns();
+        } catch (Throwable t) {
+          LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
+              StringUtils.stringifyException(t));
+        }
+
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        if (elapsedTime >= checkInterval || stop.boolVal)  continue;
+        else Thread.sleep(checkInterval - elapsedTime);
+
+      } while (!stop.boolVal);
+    } catch (Throwable t) {
+      LOG.error("Caught an exception in the main loop of compactor initiator, exiting " +
+          StringUtils.stringifyException(t));
+    }
+  }
+
+  @Override
+  public void init(BooleanPointer stop) throws MetaException {
+    super.init(stop);
+    checkInterval =
+        HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL) * 1000;
+  }
+
+  private void recoverFailedCompactions(boolean remoteOnly) throws MetaException {
+    if (!remoteOnly) txnHandler.revokeFromLocalWorkers(Worker.hostname());
+    txnHandler.revokeTimedoutWorkers(HiveConf.getLongVar(conf,
+        HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT));
+  }
+
+  // Figure out if there are any currently running compactions on the same table or partition.
+  private boolean lookForCurrentCompactions(ShowCompactResponse compactions,
+                                            CompactionInfo ci) {
+    if (compactions.getCompacts() != null) {
+      for (ShowCompactResponseElement e : compactions.getCompacts()) {
+        if (!e.getState().equals(TxnHandler.CLEANING_RESPONSE) &&
+            e.getDbname().equals(ci.dbname) &&
+            e.getTablename().equals(ci.tableName) &&
+            (e.getPartitionname() == null && ci.partName == null ||
+                  e.getPartitionname().equals(ci.partName))) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private CompactionType checkForCompaction(final CompactionInfo ci,
+                                            final ValidTxnList txns,
+                                            final StorageDescriptor sd,
+                                            final String runAs)
+      throws IOException, InterruptedException {
+    // If it's marked as too many aborted, we already know we need to compact
+    if (ci.tooManyAborts) {
+      LOG.debug("Found too many aborted transactions for " + ci.getFullPartitionName() + ", " +
+          "initiating major compaction");
+      return CompactionType.MAJOR;
+    }
+    if (runJobAsSelf(runAs)) {
+      return determineCompactionType(ci, txns, sd);
+    } else {
+      LOG.info("Going to initiate as user " + runAs);
+      UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs,
+        UserGroupInformation.getLoginUser());
+      return ugi.doAs(new PrivilegedExceptionAction<CompactionType>() {
+        @Override
+        public CompactionType run() throws Exception {
+          return determineCompactionType(ci, txns, sd);
+        }
+      });
+    }
+  }
+
+  private CompactionType determineCompactionType(CompactionInfo ci, ValidTxnList txns,
+                                                 StorageDescriptor sd)
+      throws IOException, InterruptedException {
+    boolean noBase = false;
+    Path location = new Path(sd.getLocation());
+    FileSystem fs = location.getFileSystem(conf);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns);
+    Path base = dir.getBaseDirectory();
+    long baseSize = 0;
+    FileStatus stat = null;
+    if (base != null) {
+      stat = fs.getFileStatus(base);
+      if (!stat.isDir()) {
+        LOG.error("Was assuming base " + base.toString() + " is directory, but it's a file!");
+        return null;
+      }
+      baseSize = sumDirSize(fs, base);
+    }
+
+    List<FileStatus> originals = dir.getOriginalFiles();
+    for (FileStatus origStat : originals) {
+      baseSize += origStat.getLen();
+    }
+
+    long deltaSize = 0;
+    List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
+    for (AcidUtils.ParsedDelta delta : deltas) {
+      stat = fs.getFileStatus(delta.getPath());
+      if (!stat.isDir()) {
+        LOG.error("Was assuming delta " + delta.getPath().toString() + " is a directory, " +
+            "but it's a file!");
+        return null;
+      }
+      deltaSize += sumDirSize(fs, delta.getPath());
+    }
+
+    if (baseSize == 0 && deltaSize > 0) {
+      noBase = true;
+    } else {
+      float deltaPctThreshold = HiveConf.getFloatVar(conf,
+          HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
+      boolean bigEnough =   (float)deltaSize/(float)baseSize > deltaPctThreshold;
+      if (LOG.isDebugEnabled()) {
+        StringBuffer msg = new StringBuffer("delta size: ");
+        msg.append(deltaSize);
+        msg.append(" base size: ");
+        msg.append(baseSize);
+        msg.append(" threshold: ");
+        msg.append(deltaPctThreshold);
+        msg.append(" will major compact: ");
+        msg.append(bigEnough);
+        LOG.debug(msg);
+      }
+      if (bigEnough) return CompactionType.MAJOR;
+    }
+
+    int deltaNumThreshold = HiveConf.getIntVar(conf,
+        HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
+    boolean enough = deltas.size() > deltaNumThreshold;
+    if (enough) {
+      LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold +
+          (enough ? "" : "not") + " and no base, requesting " + (noBase ? "major" : "minor") +
+          " compaction");
+      // If there's no base file, do a major compaction
+      return noBase ? CompactionType.MAJOR : CompactionType.MINOR;
+    }
+    return null;
+  }
+
+  private long sumDirSize(FileSystem fs, Path dir) throws IOException {
+    long size = 0;
+    FileStatus[] buckets = fs.listStatus(dir);
+    for (int i = 0; i < buckets.length; i++) {
+      size += buckets[i].getLen();
+    }
+    return size;
+  }
+
+  private void requestCompaction(CompactionInfo ci, String runAs, CompactionType type) throws MetaException {
+    String s = "Requesting " + type.toString() + " compaction for " + ci.getFullPartitionName();
+    LOG.info(s);
+    CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, type);
+    if (ci.partName != null) rqst.setPartitionname(ci.partName);
+    rqst.setRunas(runAs);
+    txnHandler.compact(rqst);
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A class to do compactions.  This will run in a separate thread.  It will spin on the
+ * compaction queue and look for new work to do.
+ */
+public class Worker extends CompactorThread {
+  static final private String CLASS_NAME = Worker.class.getName();
+  static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+  static final private long SLEEP_TIME = 5000;
+  static final private int baseThreadNum = 10002;
+
+  private String name;
+
+  /**
+   * Get the hostname that this worker is run on.  Made static and public so that other classes
+   * can use the same method to know what host their worker threads are running on.
+   * @return hostname
+   */
+  public static String hostname() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      LOG.error("Unable to resolve my host name " + e.getMessage());
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void run() {
+    // Make sure nothing escapes this run method and kills the metastore at large,
+    // so wrap it in a big catch Throwable statement.
+    try {
+      do {
+        CompactionInfo ci = txnHandler.findNextToCompact(name);
+
+        if (ci == null && !stop.boolVal) {
+          try {
+            Thread.sleep(SLEEP_TIME);
+            continue;
+          } catch (InterruptedException e) {
+            LOG.warn("Worker thread sleep interrupted " + e.getMessage());
+            continue;
+          }
+        }
+
+        // Find the table we will be working with.
+        Table t1 = null;
+        try {
+          t1 = resolveTable(ci);
+        } catch (MetaException e) {
+          txnHandler.markCleaned(ci);
+          continue;
+        }
+        // This chicanery is to get around the fact that the table needs to be final in order to
+        // go into the doAs below.
+        final Table t = t1;
+
+        // Find the partition we will be working with, if there is one.
+        Partition p = null;
+        try {
+          p = resolvePartition(ci);
+        } catch (Exception e) {
+          txnHandler.markCleaned(ci);
+          continue;
+        }
+
+        // Find the appropriate storage descriptor
+        final StorageDescriptor sd =  resolveStorageDescriptor(t, p);
+
+        // Check that the table or partition isn't sorted, as we don't yet support that.
+        if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
+          LOG.error("Attempt to compact sorted table, which is not yet supported!");
+          txnHandler.markCleaned(ci);
+          continue;
+        }
+
+        final boolean isMajor = (ci.type == CompactionType.MAJOR);
+        final ValidTxnList txns =
+            TxnHandler.createValidTxnList(txnHandler.getOpenTxns());
+        final StringBuffer jobName = new StringBuffer(name);
+        jobName.append("-compactor-");
+        jobName.append(ci.getFullPartitionName());
+
+        // Determine who to run as
+        String runAs;
+        if (ci.runAs == null) {
+          runAs = findUserToRunAs(sd.getLocation(), t);
+          txnHandler.setRunAs(ci.id, runAs);
+        } else {
+          runAs = ci.runAs;
+        }
+
+        LOG.info("Starting " + ci.type.toString() + " compaction for " +
+            ci.getFullPartitionName());
+
+        final CompactorMR mr = new CompactorMR();
+        try {
+          if (runJobAsSelf(runAs)) {
+            mr.run(conf, jobName.toString(), t, sd, txns, isMajor);
+          } else {
+            UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
+              UserGroupInformation.getLoginUser());
+            ugi.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                mr.run(conf, jobName.toString(), t, sd, txns, isMajor);
+                return null;
+              }
+            });
+          }
+          txnHandler.markCompacted(ci);
+        } catch (Exception e) {
+          LOG.error("Caught exception while trying to compact " + ci.getFullPartitionName() +
+              ".  Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e));
+          txnHandler.markCleaned(ci);
+        }
+      } while (!stop.boolVal);
+    } catch (Throwable t) {
+      LOG.error("Caught an exception in the main loop of compactor worker " + name +
+          ", exiting " + StringUtils.stringifyException(t));
+    }
+  }
+
+  @Override
+  public void init(BooleanPointer stop) throws MetaException {
+    super.init(stop);
+
+    StringBuffer name = new StringBuffer(hostname());
+    name.append("-");
+    name.append(getId());
+    this.name = name.toString();
+    setName(name.toString());
+  }
+
+}